Introduction
Graphile Worker RS is a PostgreSQL-backed job queue for Rust applications. It lets an application store durable work in PostgreSQL, then process that work in background workers instead of blocking request handling or other latency sensitive paths.
Use it for work such as sending email, generating documents, running calculations, synchronizing data, cleaning up records, or moving jobs produced by PostgreSQL triggers and functions into Rust code. Jobs are stored in PostgreSQL, task handlers run in Rust, and the worker coordinates fetching, execution, retries, queues, and scheduling.
Relationship to Graphile Worker
Graphile Worker RS is based on the Node.js Graphile Worker project. The core model is the same: PostgreSQL is the durable queue, workers claim available jobs, task identifiers choose the handler, and job metadata controls scheduling, priority, attempts, and queue behavior.
This project is not a Node wrapper. It is a Rust rewrite with Rust APIs,
workspace crates, feature flags, and async runtime integration. The root crate is
graphile_worker, and the workspace contains focused crates for task handlers,
job specifications, context, migrations, cron parsing and running, lifecycle
hooks, recovery, database access, runtime adapters, the CLI, and admin tooling.
The Rust implementation is mostly compatible with the original Graphile Worker model, so it can be used alongside the Node.js worker where the shared database schema and job contracts match. One important implementation difference is worker identity: the Rust worker uses one worker id per worker instance, and jobs are processed by the enabled async runtime.
What It Solves
Graphile Worker RS is useful when PostgreSQL is already part of your system and you want background processing without operating a separate queue service. It provides:
- durable job storage in PostgreSQL
- efficient job claiming with PostgreSQL locking
- low-latency wakeups through PostgreSQL notifications
- typed Rust task handlers through
TaskHandler - scheduling from Rust through
WorkerUtilsor from SQL throughgraphile_worker.add_job - delayed jobs, priorities, attempts, job keys, and serial queues
- cron-like recurring jobs through the crontab crates
- batch job insertion and batch task handling
- configurable worker concurrency
- graceful shutdown behavior for in-flight work
- optional worker recovery for jobs left locked by failed workers
- lifecycle hooks, tracing integration, CLI commands, and admin UI crates for operational visibility
The default feature set uses Tokio, rustls, and the SQLx PostgreSQL driver. Feature flags also expose async-std, native TLS, the tokio-postgres driver, and OpenTelemetry compatibility groups.
Guide and API Reference
This mdBook is the narrative guide. Read it when you want to understand how the pieces fit together, choose configuration, follow a task-oriented workflow, or prepare a worker for production.
Start with Getting Started to install the crate,
define a first TaskHandler, register it with WorkerOptions, and run a worker.
Then use Core Concepts for the mental model around tasks,
workers, scheduling, queues, and the database schema.
Use Configuration when selecting runtime, TLS, database driver, shutdown, recovery, and application state options. Use Guides for focused workflows such as scheduling jobs, batch jobs, cron jobs, local queue behavior, worker recovery, lifecycle hooks, and job management. Use Operations for the CLI, admin UI, migrations, deployment, and observability topics.
Use docs.rs when you need generated Rust API documentation: exact method signatures, trait bounds, enum variants, struct fields, feature-gated items, and crate-level rustdoc. The API Reference page links to the generated docs for the main crate and supporting crates, while Crate Map explains what each workspace crate is for.
For executable examples, start with examples/simple.rs, then look at
examples/crontab.rs, examples/batch_add_jobs.rs, examples/local_queue.rs,
and the hook examples. The integration tests under tests/ are also useful when
you need precise behavior for scheduling, queues, worker utilities, migrations,
recovery, and runtime driver paths.
Getting Started
Graphile Worker RS is a PostgreSQL-backed job queue for Rust applications. Use it when application work should move out of the request path: sending email, generating files, running calculations, scheduling follow-up work, or processing jobs created by PostgreSQL triggers and functions.
This section is an adoption roadmap. It points to the pages that help you move from deciding whether Graphile Worker RS fits your application to running your first worker and exploring the examples.
Adoption Roadmap
- Decide whether a PostgreSQL-backed queue is the right model for your work. Start with When to Use Graphile Worker RS.
- Add the crate and choose the runtime, TLS, and database driver features your application will use. See Installation.
- Run a minimal worker to prove your database connection, migrations, task registration, and job execution flow. Follow Quick Start.
- Turn that minimal worker into an application shape you can keep: define task payloads, register handlers, provide a PostgreSQL pool, and decide how the worker should run. See First Worker.
- Compare the repository examples with your use case. The Examples Tour is the best next stop once the basic worker is running.
What You Build First
A worker needs three application-level pieces:
- A serializable payload type.
- A
TaskHandlerimplementation with a stableIDENTIFIER. WorkerOptionsconfigured with a PostgreSQL pool and registered handlers.
The shape is visible in examples/simple.rs:
use graphile_worker::{IntoTaskHandlerResult, WorkerContext};
use graphile_worker_task_handler::TaskHandler;
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SayHello {
message: String,
}
impl TaskHandler for SayHello {
const IDENTIFIER: &'static str = "say_hello";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Hello {} !", self.message);
Ok(())
}
}
The worker is then configured with a connection pool, concurrency, schema, and registered task:
use graphile_worker::WorkerOptions;
let worker = WorkerOptions::default()
.concurrency(2)
.schema("example_simple_worker")
.define_job::<SayHello>()
.pg_pool(pg_pool)
.init()
.await?;
worker.run().await?;
Adding Work to the Queue
After initialization, create utilities from the worker and add jobs with a
payload and JobSpecBuilder. The simple example schedules a job for later:
use chrono::{offset::Utc, Duration};
use graphile_worker::JobSpecBuilder;
let helpers = worker.create_utils();
helpers
.add_job(
SayHello {
message: "world".to_string(),
},
JobSpecBuilder::new()
.run_at(Utc::now() + Duration::seconds(10))
.build(),
)
.await?;
That flow is enough to validate the core model: enqueue typed work, store it in PostgreSQL, and let the worker execute the matching handler.
Installation Choices
The default crate features enable Tokio, Rustls TLS, and the SQLx driver. A basic dependency setup looks like this:
[dependencies]
graphile_worker = "0.13"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
The crate also exposes feature flags for runtime-async-std,
tls-native-tls, driver-tokio-postgres, and OpenTelemetry compatibility.
Use Installation first, then check
Runtime, TLS, and Drivers and
Feature Flags when your application needs a
non-default combination.
Where to Go Next
- When to Use Graphile Worker RS: confirm the queue model and tradeoffs.
- Installation: add the crate and choose features.
- Quick Start: run the smallest useful worker.
- First Worker: structure your first real task handler.
- Examples Tour: map repository examples to common use cases.
- Core Concepts: learn how tasks, workers, scheduling, queues, and the database schema fit together.
- Configuration: tune worker options, shutdown, runtime choices, and application state.
- Operations: prepare migrations, deployment, observability, CLI usage, and admin tooling.
When to Use Graphile Worker RS
Graphile Worker RS is a PostgreSQL-backed job queue for Rust applications. Use it when your application already depends on PostgreSQL and you want background work to be stored, coordinated, retried, and scheduled through the same database instead of a separate queue service.
It is based on Graphile Worker and is designed for jobs such as sending emails, performing calculations, generating PDFs, scheduling future work, and running recurring tasks.
Good Fit
Graphile Worker RS is a good fit when you need one or more of these properties:
- Background jobs that survive process restarts because they are persisted in PostgreSQL.
- Low-latency job pickup using PostgreSQL
LISTEN/NOTIFY. - Concurrent job processing coordinated through PostgreSQL
SKIP LOCKED. - Automatic retries for failed jobs, including exponential backoff.
- Scheduled jobs that should run later.
- Cron-like recurring jobs.
- Type-safe Rust task handlers with
serdepayloads. - A worker that can run inside an existing Rust async application.
- Optional worker recovery for deployments where jobs may remain locked after a crash, network partition, forced abort, or orchestrator shutdown.
The common case is an application that already writes application data to PostgreSQL and wants to enqueue follow-up work close to that data.
use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SendEmail {
to: String,
subject: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("send '{}' to {}", self.subject, self.to);
Ok::<(), String>(())
}
}
Typical Use Cases
User-Facing Background Work
Move slow or unreliable work out of HTTP request handling:
- Send transactional email after signup or payment.
- Generate a PDF, image, report, or export after the user requests it.
- Perform expensive calculations after saving the primary database record.
- Notify another system without blocking the response path.
Scheduled Work
Use scheduled jobs when the job should run at a known time in the future:
// Build the job with its payload and scheduling options, then enqueue it with
// WorkerUtils from your application code.
let job = SendEmail {
to: "customer@example.com".to_owned(),
subject: "Your report is ready".to_owned(),
};
Recurring Work
Use cron-like recurring tasks for periodic maintenance or synchronization:
- Send daily reports.
- Reconcile records on a schedule.
- Poll an integration periodically.
- Clean up old data.
Graphile Worker RS exposes crontab parsing and cron runner types from the main crate, so recurring jobs can live beside the rest of your worker configuration.
Database-Centered Workflows
The crate description explicitly calls out jobs generated by PostgreSQL triggers or functions. This is useful when the database is the source of truth for state transitions and the application needs a Rust worker to process the resulting work queue.
Prerequisites
Before choosing Graphile Worker RS, make sure these assumptions match your project:
- You can provide a PostgreSQL database connection.
- Your application can run a Rust async runtime. Tokio is enabled by default,
and async-std is available through the
runtime-async-stdfeature. - Your job payloads can be serialized and deserialized with
serde. - You can register every task handler with a stable identifier string.
- You are comfortable letting the worker manage its own PostgreSQL schema. The
default schema name is
graphile_worker. - You can choose the database driver and TLS features that match your deployment. The default features include Tokio, rustls TLS, and the sqlx driver.
A minimal Tokio setup looks like this:
graphile_worker::WorkerOptions::default()
.concurrency(5)
.schema("graphile_worker")
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?
.run()
.await?;
When to Use Recovery
Worker recovery is useful for deployments where jobs can be interrupted by events outside the normal graceful shutdown path, such as a process crash, network partition, forced abort, or orchestrator shutdown.
When enabled, workers record heartbeats in PostgreSQL. A sweeper can then find workers that have stopped heartbeating, unlock their jobs, release queue locks, and delay the recovered jobs before retrying them.
use graphile_worker::{WorkerOptions, WorkerRecoveryConfig};
use std::time::Duration;
let recovery = WorkerRecoveryConfig::default()
.enabled(true)
.heartbeat_interval(Duration::from_secs(30))
.sweep_interval(Duration::from_secs(60))
.sweep_threshold(Duration::from_secs(300))
.recovery_delay(Duration::from_secs(30));
let worker = WorkerOptions::default()
.worker_recovery(recovery)
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
Recovery is disabled by default, so enable it deliberately for deployments that need stale worker detection and job lock recovery.
Bad Fit
Graphile Worker RS may not be the right tool when:
- Your application does not use PostgreSQL and you do not want PostgreSQL to be part of the job system.
- You need an in-memory-only queue where jobs can be discarded on restart.
- You need a general message broker that is independent of your database.
- Your workers are not Rust applications.
- Your workload cannot be represented as registered task identifiers with serializable payloads.
- You need to avoid database-managed queue state entirely.
It is also not a replacement for normal request handling. If work must finish before the response can be sent, keep it in the request path. Use Graphile Worker RS when the work can run asynchronously and be retried independently.
Decision Checklist
Choose Graphile Worker RS when most of these are true:
- PostgreSQL is already a required dependency.
- The job must survive application restarts.
- Failed jobs should be retried automatically.
- Jobs can be described with a task identifier and a serialized payload.
- Running the worker in a Rust async process fits your deployment.
- Keeping queue state near application data is simpler than operating another queue service.
Installation
Graphile Worker RS is published as the graphile_worker crate. It needs a
PostgreSQL database and one supported async runtime in your application.
Add the Crate
For the default setup, add the main crate:
cargo add graphile_worker
Or add it to Cargo.toml directly:
[dependencies]
graphile_worker = "0.13"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
The default feature set enables:
runtime-tokiotls-rustlsdriver-sqlx
This is the recommended starting point for Tokio applications using SQLx and rustls TLS.
Tokio
Tokio is enabled by default, so most applications only need to add Tokio with the runtime features they use:
[dependencies]
graphile_worker = "0.13"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
A typical Tokio entrypoint looks like this:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
run_worker().await
}
async-std
To use async-std, disable default features and enable runtime-async-std.
Because the default SQLx driver and rustls TLS are also disabled when default
features are disabled, enable them explicitly if you still want that setup:
[dependencies]
graphile_worker = { version = "0.13", default-features = false, features = [
"runtime-async-std",
"tls-rustls",
"driver-sqlx",
] }
async-std = { version = "1", features = ["attributes"] }
Applications using the #[async_std::main] macro need async-std's
attributes feature:
#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
run_worker().await
}
Driver and TLS Features
The SQLx PostgreSQL driver is enabled by default through driver-sqlx. The
crate also exposes driver-tokio-postgres for Tokio applications.
[dependencies]
graphile_worker = { version = "0.13", default-features = false, features = [
"runtime-tokio",
"driver-tokio-postgres",
] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
The driver-tokio-postgres feature enables runtime-tokio and is tested only
with the Tokio runtime. Use driver-sqlx for async-std.
TLS is selected separately from the database driver. The available TLS feature flags are:
tls-rustlstls-native-tls
When you disable default features, choose the runtime, database driver, and TLS features you need explicitly.
DATABASE_URL
Graphile Worker RS connects to PostgreSQL. Local tests and examples in this
repository use a standard PostgreSQL connection string through DATABASE_URL:
export DATABASE_URL='postgres://postgres:postgres@localhost:54233/postgres'
Use the same URL form for your own database, changing the username, password, host, port, and database name as needed.
For a worker setup example that creates a SQLx pool and registers jobs, continue to Quick Start. For a complete feature list, see Feature Flags.
Quick Start
This page shows the smallest useful Graphile Worker RS setup: define one typed task, register it with a worker, add a job, and start processing jobs.
Add Dependencies
Graphile Worker RS runs against PostgreSQL. With the default Tokio runtime, rustls TLS, and SQLx driver, a minimal application can connect with a database URL and these dependencies:
[dependencies]
graphile_worker = "0.13"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde = { version = "1", features = ["derive"] }
Define a Typed Task
A task is a Rust type that can be serialized into the job payload and
deserialized when the worker runs it. Implement TaskHandler for the payload
type and give it a stable identifier.
use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SendEmail {
to: String,
subject: String,
body: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Sending '{}' to {}", self.subject, self.to);
Ok::<(), String>(())
}
}
The identifier is the task name stored in PostgreSQL. Jobs with this identifier
will be decoded as SendEmail and passed to SendEmail::run.
Create and Run a Worker
Configure a PostgreSQL connection string, register the task with define_job,
initialize the worker, and call run.
use graphile_worker::WorkerOptions;
async fn run_worker() -> Result<(), Box<dyn std::error::Error>> {
let worker = WorkerOptions::default()
.database_url("postgres://postgres:password@localhost/mydb")
.concurrency(5)
.schema("graphile_worker")
.define_job::<SendEmail>()
.init()
.await?;
worker.run().await?;
Ok(())
}
init connects to PostgreSQL, runs the worker migrations for the configured
schema, registers the task handlers, and returns a Worker. The schema defaults
to graphile_worker if you do not set one.
Add a Job
After initialization, use create_utils to get WorkerUtils. Its add_job
method accepts the typed task payload and a JobSpec.
use graphile_worker::JobSpecBuilder;
let helpers = worker.create_utils();
helpers
.add_job(
SendEmail {
to: "ada@example.com".to_string(),
subject: "Welcome".to_string(),
body: "Thanks for signing up.".to_string(),
},
JobSpecBuilder::new().build(),
)
.await?;
For scheduled jobs, set run_at on the job spec before building it. This
example uses chrono; add it to your application if you schedule from Rust
timestamps:
use chrono::{Duration, offset::Utc};
use graphile_worker::JobSpecBuilder;
helpers
.add_job(
SendEmail {
to: "ada@example.com".to_string(),
subject: "Welcome".to_string(),
body: "Thanks for signing up.".to_string(),
},
JobSpecBuilder::new()
.run_at(Utc::now() + Duration::seconds(10))
.build(),
)
.await?;
Complete Shape
This combines the pieces into one application shape. In a real application you can add jobs from request handlers, services, or startup code while one or more workers process them.
use graphile_worker::{
IntoTaskHandlerResult, JobSpecBuilder, TaskHandler, WorkerContext, WorkerOptions,
};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SendEmail {
to: String,
subject: String,
body: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Sending '{}' to {}", self.subject, self.to);
Ok::<(), String>(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let worker = WorkerOptions::default()
.database_url("postgres://postgres:password@localhost/mydb")
.concurrency(5)
.schema("graphile_worker")
.define_job::<SendEmail>()
.init()
.await?;
worker
.create_utils()
.add_job(
SendEmail {
to: "ada@example.com".to_string(),
subject: "Welcome".to_string(),
body: "Thanks for signing up.".to_string(),
},
JobSpecBuilder::new().build(),
)
.await?;
worker.run().await?;
Ok(())
}
For a runnable example in the repository, see examples/simple.rs. For more
configuration options, continue with Configuration.
First Worker
This page walks through the smallest useful Graphile Worker RS flow:
- Create a PostgreSQL pool.
- Define a task handler.
- Configure
WorkerOptions. - Initialize the worker.
- Add a job.
- Run the worker.
- Let shutdown happen gracefully.
The examples use Tokio and SQLx, matching the default setup used by the repository examples.
Define a Handler
A worker processes jobs by matching each job's task identifier to a registered
TaskHandler. The handler type is also the payload type, so it should derive
Serialize and Deserialize.
use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SayHello {
message: String,
}
impl TaskHandler for SayHello {
const IDENTIFIER: &'static str = "say_hello";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Hello {} !", self.message);
Ok::<(), String>(())
}
}
IDENTIFIER is the task name stored in PostgreSQL. Register the same handler
type on any worker that should be able to process jobs with that identifier.
Create the Pool
You can pass an existing SQLx PostgreSQL pool with pg_pool. This is useful
when your application already owns database configuration.
use std::str::FromStr;
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
let pg_options = PgConnectOptions::from_str(
"postgres://postgres:root@localhost:5432",
)?;
let pg_pool = PgPoolOptions::new()
.max_connections(5)
.connect_with(pg_options)
.await?;
You can also let Graphile Worker create the pool from a URL with
database_url. When using database_url, max_pg_conn controls the pool size
and defaults to 20 if it is not set.
let options = graphile_worker::WorkerOptions::default()
.database_url("postgres://postgres:root@localhost:5432")
.max_pg_conn(5);
If both an existing database connection and database_url are provided, the
existing connection takes precedence.
Configure and Initialize
WorkerOptions is a builder. The usual first-worker options are:
concurrency: maximum jobs processed at the same time. If omitted, it defaults to the number of logical CPUs.schema: PostgreSQL schema used for Graphile Worker tables. If omitted, it defaults tographile_worker.define_job::<T>(): registers aTaskHandlertype.pg_pool,database, ordatabase_url: provides the database connection.
Calling init().await connects to the database if needed, runs migrations for
the selected schema, registers task details, creates the worker id, and returns
a ready Worker.
use graphile_worker::WorkerOptions;
let worker = WorkerOptions::default()
.concurrency(2)
.schema("example_simple_worker")
.define_job::<SayHello>()
.pg_pool(pg_pool)
.init()
.await?;
Add a Job
After initialization, create utilities from the worker and enqueue a job.
JobSpec::default() schedules it with default job options.
use graphile_worker::JobSpec;
let helpers = worker.create_utils();
helpers
.add_job(
SayHello {
message: "world".to_string(),
},
JobSpec::default(),
)
.await?;
To schedule a job for later, build a JobSpec with JobSpecBuilder.
use chrono::{Duration, Utc};
use graphile_worker::JobSpecBuilder;
helpers
.add_job(
SayHello {
message: "world".to_string(),
},
JobSpecBuilder::new()
.run_at(Utc::now() + Duration::seconds(10))
.build(),
)
.await?;
Run the Worker
run() starts the long-running worker loop. It registers the worker, starts job
sources, processes jobs until shutdown is requested, waits for batchers to
finish, emits shutdown hooks, and deregisters the worker.
worker.run().await?;
For scripts and smoke tests, run_once() processes currently available jobs and
then returns.
worker.run_once().await?;
Full Example
use std::str::FromStr;
use graphile_worker::{
IntoTaskHandlerResult, JobSpec, TaskHandler, WorkerContext, WorkerOptions,
};
use serde::{Deserialize, Serialize};
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
#[derive(Deserialize, Serialize)]
struct SayHello {
message: String,
}
impl TaskHandler for SayHello {
const IDENTIFIER: &'static str = "say_hello";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Hello {} !", self.message);
Ok::<(), String>(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pg_options = PgConnectOptions::from_str(
"postgres://postgres:root@localhost:5432",
)?;
let pg_pool = PgPoolOptions::new()
.max_connections(5)
.connect_with(pg_options)
.await?;
let worker = WorkerOptions::default()
.concurrency(2)
.schema("example_simple_worker")
.define_job::<SayHello>()
.pg_pool(pg_pool)
.init()
.await?;
let helpers = worker.create_utils();
helpers
.add_job(
SayHello {
message: "world".to_string(),
},
JobSpec::default(),
)
.await?;
worker.run().await?;
Ok(())
}
Shutdown
By default, the worker listens for OS shutdown signals such as Ctrl+C and SIGTERM. When a shutdown signal arrives, in-flight jobs are given a grace period to finish. The default grace period is 5 seconds, and shutdown-aborted jobs are retried after a default delay of 30 seconds.
If the host application already owns shutdown handling, pass a custom shutdown future and disable OS signal listeners:
use graphile_worker::WorkerShutdownConfig;
use std::time::Duration;
let shutdown = WorkerShutdownConfig::default()
.listen_os_shutdown_signals(false)
.grace_period(Duration::from_secs(10))
.interrupted_job_retry_delay(Duration::from_secs(30))
.shutdown_signal(async {
// Complete this future when your application wants the worker to stop.
});
let worker = WorkerOptions::default()
.worker_shutdown(shutdown)
.define_job::<SayHello>()
.pg_pool(pg_pool)
.init()
.await?;
worker.run().await?;
The same settings can also be configured directly on WorkerOptions with
listen_os_shutdown_signals, shutdown_signal, shutdown_grace_period, and
shutdown_interrupted_job_retry_delay.
Next, see Scheduling Jobs for job timing options and Worker Options for more configuration details.
Examples Tour
The repository includes runnable examples that show how the worker APIs fit
together in real programs. This page tours each file under examples/ and
calls out what to learn from it.
Most examples connect to PostgreSQL, initialize a worker, add one or more jobs,
and then run the worker. Several examples fall back to
postgres://postgres:root@localhost:5432; others read DATABASE_URL first.
Basic worker flow
examples/simple.rs
Start here if you want the smallest complete worker shape:
- Define a serializable task payload.
- Implement
TaskHandlerwith anIDENTIFIER. - Build
WorkerOptions, register the task, attach a PostgreSQL pool, and callinit(). - Use
worker.create_utils()to enqueue work. - Call
worker.run()to process jobs continuously.
The task intentionally fails most of the time so you can observe retry behavior:
impl TaskHandler for SayHello {
const IDENTIFIER: &'static str = "say_hello";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Hello {} !", self.message);
if rand::rng().random_range(0..100) < 70 {
return Err("Failed".to_string());
}
Ok(())
}
}
It also schedules its first job for 10 seconds in the future with
JobSpecBuilder::run_at.
examples/run_once.rs
Use this example when you want a bounded worker pass instead of a long-running
process. It schedules 10 SayHello jobs, then calls:
worker.run_once().await.unwrap();
This is useful for learning the difference between continuous workers and a single processing pass.
Sharing application state
examples/app_state.rs
This example shows how to attach application-owned state to the worker and read it from task handlers.
AppState wraps an Arc<AtomicUsize>, is installed with
WorkerOptions::add_extension, and is retrieved from the task context:
let app_state = ctx.get_ext::<AppState>().unwrap();
let run_count = app_state.increment_run_count();
println!("Run count: {run_count}");
The file also combines normal enqueued jobs with a cron definition using
Cron::every_minute::<ShowRunCount>().fill(CrontabFill::hours(1)), so it is a
compact example of state plus recurring jobs.
Scheduling work from a handler
examples/context_helpers.rs
This example demonstrates WorkerContextExt helper methods inside a running
task. SendWs represents an initial operation, then schedules a CheckWs
follow-up job from inside run:
ctx.add_job(
CheckWs {
request_id: self.request_id.clone(),
},
JobSpecBuilder::new()
.run_at(Utc::now() + Duration::seconds(10))
.build(),
)
.await?;
Use this pattern for workflows where one job needs to create the next step after it succeeds.
Adding jobs in batches
examples/batch_add_jobs.rs
This file compares two batch insertion APIs:
add_jobs::<SendEmail>(&emails)for many jobs with the same typed payload.add_raw_jobs(&mixed_jobs)for heterogeneous jobs where each item provides an identifier, JSON payload, andJobSpec.
The raw batch includes both send_email and process_payment jobs, and uses
JobSpecBuilder::priority(-10) for one urgent payment job:
let mixed_jobs = vec![
RawJobSpec {
identifier: "send_email".into(),
payload: json!({ "to": "dave@example.com", "subject": "Notification" }),
spec: JobSpec::default(),
},
RawJobSpec {
identifier: "process_payment".into(),
payload: json!({ "user_id": 123, "amount": 50 }),
spec: JobSpecBuilder::new().priority(-10).build(),
},
];
Use typed batches when all payloads share one handler type. Use raw batches when you need one database insert for multiple task identifiers.
Cron jobs
examples/crontab.rs
This example focuses on recurring jobs. It registers two task types and defines
a cron entry for SayHello:
Cron::every_n_minutes::<SayHello>(2)
.unwrap()
.fill(CrontabFill::minutes(10))
.job_key("say_hello_dedupe")
.job_key_mode(CronJobKeyMode::PreserveRunAt)
.payload(SayHello {
message: "Crontab".to_string(),
})
.unwrap()
The example shows how to set:
- the interval with
every_n_minutes; - backfill with
CrontabFill; - a
job_keyfor deduplication; CronJobKeyMode::PreserveRunAt;- a typed payload for the recurring job.
See also Cron Jobs for a broader explanation of recurring work.
Plugins and hooks
examples/hooks.rs
This is the entry point for the hooks example. It wires together:
ProcessData, the task being processed;MetricsPlugin, which observes worker and job lifecycle events;ValidationPlugin, which can continue, skip, or fail a job before it runs;logging::enable_logs, a small tracing setup.
It enqueues four jobs so you can see each path: normal processing, skip, forced failure, and another normal job.
examples/hooks/process_data.rs
This file defines the payload used by the hooks example:
pub(super) struct ProcessData {
pub(super) value: i32,
#[serde(default)]
pub(super) skip: bool,
#[serde(default)]
pub(super) force_fail: bool,
}
The handler itself only prints the value. The interesting behavior comes from
plugins inspecting skip and force_fail.
examples/hooks/validation.rs
ValidationPlugin registers a BeforeJobRun hook. It reads the raw JSON
payload from the hook context:
- when
skipis true, it returnsHookResult::Skip; - when
force_failis true, it returnsHookResult::Fail(...); - otherwise it returns
HookResult::Continue.
This is the file to study when a plugin needs to make a pre-run decision.
examples/hooks/metrics.rs
MetricsPlugin registers lifecycle hooks for:
WorkerStart;WorkerShutdown;JobStart;JobComplete;JobFail.
It keeps atomic counters for started, completed, and failed jobs, then prints final counts during shutdown.
examples/hooks/logging.rs
This helper installs a tracing_subscriber registry with a debug-level filter
and sqlx=warn. It is intentionally small so the hook examples can focus on
plugin behavior.
Local queue behavior
examples/local_queue.rs
This example shows how to enable and observe local queue batching. It configures the worker with:
.local_queue(
LocalQueueConfig::default()
.with_size(50)
.with_ttl(Duration::from_secs(60))
.with_refetch_delay(
RefetchDelayConfig::default()
.with_duration(Duration::from_millis(100))
.with_threshold(5)
.with_max_abort_threshold(20),
),
)
The local plugin logs queue-specific hooks such as initialization, mode changes,
jobs fetched, jobs returned, refetch delay start, delay abort, and delay expiry.
The example enqueues 20 short ProcessItem jobs so you can watch batch fetching
and refetch-delay behavior in logs.
Worker utilities in a web server
The sendable_worker example is split across several files. Together they show
that a worker can run beside an HTTP server while WorkerUtils is shared with
request handlers.
examples/sendable_worker.rs
This is the example entry point. It:
- builds a worker with
Worker::options(); - registers
ExampleTaskandDatabaseTask; - wraps
worker.create_utils()inArc; - binds a TCP listener on
127.0.0.1:3000; - spawns the worker with
tokio::spawn; - spawns a simple HTTP accept loop;
- starts a second worker against the same database URL.
The application waits with tokio::select! for a worker task, server task,
secondary worker task, or Ctrl+C.
examples/sendable_worker/tasks.rs
This file defines the two task handlers used by the HTTP example:
ExampleTasklogs a name and value, sleeps for 100 milliseconds, and succeeds.DatabaseTaskusesctx.pg_pool()to runSELECT COUNT(*) FROM graphile_worker.jobsand logs the current job count.
Study this file when a task needs access to the worker's PostgreSQL pool.
examples/sendable_worker/http.rs
This file contains the request dispatcher. It reads the HTTP request line, extracts the method and path, and routes:
GET /andGET /health;POST /schedule/example?...;POST /schedule/database?...;- everything else to
404 Not Found.
It is deliberately minimal HTTP parsing, enough to demonstrate sharing
WorkerUtils with request handlers.
examples/sendable_worker/http/schedule.rs
This file schedules jobs from HTTP routes.
schedule_example_task parses name and value, then calls
utils.add_job(ExampleTask { name, value }, Default::default()).
schedule_database_task requires a query parameter and uses a richer
JobSpecBuilder:
let job_spec = JobSpecBuilder::new()
.priority(-10)
.run_at(chrono::Utc::now() + chrono::Duration::seconds(10))
.job_key(format!("db_task_{}", chrono::Utc::now().timestamp()))
.build();
This demonstrates scheduling from request input with priority, delay, and job key options.
examples/sendable_worker/http/query.rs
This helper parses query parameters into a HashMap<String, String> and applies
a small percent-decoding function for the encoded characters used by the
example routes.
examples/sendable_worker/http/response.rs
This helper writes a plain-text HTTP response to a TcpStream, including status
line, content length, content type, and body.
Core Concepts
Graphile Worker RS is a PostgreSQL-backed job queue for Rust applications. The core mental model is:
- Your application describes work as typed tasks.
- It adds jobs for those tasks into PostgreSQL.
- A worker fetches runnable jobs, executes the matching task handler, and records the result back in PostgreSQL.
The database is the coordination point. Jobs, task metadata, queue state,
worker state, retry metadata, and scheduling data live in the Graphile Worker
schema, which defaults to graphile_worker.
The Main Pieces
Tasks
A task is the Rust type that defines what a job does. It implements
TaskHandler, provides a stable task identifier, accepts a serializable payload,
and contains the async run method for the work.
use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SendEmail {
to: String,
subject: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Sending email to {}", self.to);
Ok::<(), String>(())
}
}
Register task handlers on WorkerOptions before starting the worker:
let worker = graphile_worker::WorkerOptions::default()
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
For more detail, see Tasks and Payloads.
Jobs
A job is a stored request to run a task. It includes the task identifier, payload, state, and execution metadata. Job specifications can also carry options such as priority, retry behavior, scheduling information, and queue selection.
Jobs are stored in PostgreSQL, so application processes can enqueue work and
worker processes can execute it independently. WorkerUtils provides utility
operations for managing jobs, such as adding, removing, and rescheduling them.
Workers
A worker is the runtime process that polls PostgreSQL, locks runnable jobs, and
executes registered handlers with the configured concurrency. Graphile Worker RS
uses PostgreSQL features such as SKIP LOCKED for efficient job fetching and
LISTEN/NOTIFY for low-latency wakeups.
graphile_worker::WorkerOptions::default()
.concurrency(5)
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?
.run()
.await?;
Workers also own lifecycle concerns such as graceful shutdown. Optional recovery settings can record worker heartbeats and recover jobs locked by workers that no longer heartbeat.
For more detail, see Worker Lifecycle and Architecture.
Scheduling
Scheduling determines when a job becomes runnable. A job can be available
immediately or scheduled for a later time. Graphile Worker RS also exposes cron
support through Cron and CronBuilder for recurring work.
Use scheduling when the time a job should run matters more than when it was created, such as sending a reminder later or running a daily report.
For more detail, see Scheduling Model.
Queues and Concurrency
Concurrency controls how many jobs a worker may process at once. Queues add coordination around groups of jobs. They are useful when some work must be serialized or separated from other work.
The database schema includes _private_job_queues, which tracks queue names for
serialized job execution. Worker configuration controls the amount of parallel
work a process can take on.
For more detail, see Queues and Concurrency.
Database Schema
Graphile Worker RS manages its own PostgreSQL schema and migrations. The default
schema name is graphile_worker, and the crate documentation identifies these
core tables:
_private_jobsstores job data, state, and execution metadata._private_taskstracks registered task types._private_job_queuesmanages queue names for serialized job execution._private_workerstracks active worker instances.
For more detail, see Database Schema.
How The Pieces Fit Together
In a typical application:
- Define one Rust type per task and implement
TaskHandler. - Configure a
WorkerOptionsvalue with a PostgreSQL pool and task definitions. - Initialize the worker so migrations and setup can run.
- Add jobs from application code or utilities.
- Run one or more worker processes to execute runnable jobs.
The important boundary is between tasks and jobs. A task is code compiled into your application. A job is data stored in PostgreSQL that says which task should run, with which payload, and under which execution rules.
Where To Go Next
Start with Architecture for the system-level view, then read Tasks and Payloads and Worker Lifecycle to understand the main programming model. After that, use Scheduling Model, Queues and Concurrency, and Database Schema for the specific behavior you need to configure or operate.
Architecture
Graphile Worker RS is a PostgreSQL-backed job queue. Application code writes jobs into the Graphile Worker schema, worker processes lock ready jobs with PostgreSQL row locks, task handlers run in the configured async runtime, and the worker persists completion, retry, failure, or recovery state back to the database.
The public crate is split into a small set of user-facing concepts:
WorkerOptionsconfigures a worker, registers task handlers, and creates aWorker.Workerruns the job loop until shutdown.WorkerUtilsadds and manages jobs outside the worker loop.TaskHandlerdefines the Rust code that handles a job payload.JobSpecconfigures scheduling, queues, attempts, priority, flags, and job keys for inserted jobs.
Internally, the root crate coordinates smaller workspace crates. The query crate owns SQL construction, the job/task/spec crates define shared data structures, the lifecycle hook crate exposes extension points, and the runtime facade keeps the worker logic independent of a specific async runtime feature.
Data Flow
The normal path from enqueue to release is:
- Application code adds a job through
WorkerUtilsor the lower-leveladd_jobSQL wrapper. - PostgreSQL stores the job in the worker schema and emits the database notification used by listeners.
- A running worker receives a signal from
LISTEN jobs:insert, periodic polling,run_once, or an internal local-queue signal. - Worker tasks fetch ready jobs with
for update skip locked, filtered to the task identifiers registered in this worker. - The worker builds a
WorkerContext, runs the matchingTaskHandler, and catches task errors and panics. - The release path deletes completed jobs, reschedules retryable failures, or returns interrupted jobs for recovery.
application
|
| add_job(identifier, payload, JobSpec)
v
PostgreSQL worker schema
|
| LISTEN/NOTIFY or polling
v
Worker job loop
|
| get_job / batch_get_jobs
v
TaskHandler::run(WorkerContext)
|
+-- success ----------> complete_job: delete job, unlock queue
|
+-- task failure -----> fail_job: save error, unlock, retry later
|
+-- shutdown abort ---> recovery return: decrement attempt, unlock, delay
Adding Jobs
The query layer inserts jobs by calling the schema's add_job database
function with the task identifier, JSON payload, and JobSpec fields:
let job = worker
.create_utils()
.add_raw_job(
"send_email",
serde_json::json!({ "to": "a@example.com" }),
graphile_worker::JobSpec::default(),
)
.await?;
The SQL wrapper passes these options to PostgreSQL:
identifier: the task name registered by a handler.payload: the JSON payload stored with the job.queue_name: optional queue serialization key.run_at: optional scheduled time.max_attempts: optional retry limit.job_keyandjob_key_mode: optional job de-duplication/update behavior.priority: lower values are fetched first.flags: labels that workers may skip throughforbidden_flags.
When tracing context is active, the insert path can add trace information into
the payload before writing it. If use_local_time is enabled and no run_at
is supplied, the Rust process time is used as the job's run time; otherwise the
SQL layer uses database time.
Fetching Jobs
Workers do not scan every job type. During initialization they know the task
identifiers they can run, and fetch only jobs whose task_id is in that set.
The fetch query requires:
is_available = truerun_at <= now(), or the supplied local timestamp- no forbidden flags selected by this worker
- an available queue when the job belongs to a named queue
Jobs are ordered by priority asc, run_at asc, then locked with:
for update
skip locked
After selecting a row, the same query increments attempts, sets locked_by
to the worker id, and records locked_at. For queued jobs, the related
job_queues row is also locked by the worker. This is the core coordination
mechanism that lets multiple worker processes share the same PostgreSQL queue
without taking the same job.
Worker Loop
Worker::run() performs the runtime lifecycle in this order:
- Register the worker when recovery support is enabled.
- Emit worker start hooks.
- Spawn recovery background tasks.
- Run the crontab scheduler and job runner together.
- Wait for completion and failure batchers to flush on shutdown.
- Stop recovery tasks, emit shutdown hooks, and deregister the worker.
The job runner listens for job signals. A signal can come from the PostgreSQL
listener, the polling interval, run_once, or the local queue. Each signal is
fanned out to the configured concurrency so idle worker tasks can attempt to
fetch jobs. Signals are wakeup hints rather than durable work items, so the
dispatcher coalesces them when worker tasks are already saturated. This keeps
PostgreSQL notification listeners drained while polling still provides the
fallback path for missed or future work.
There are two fetch modes:
- Direct mode fetches one job per worker task with
get_job. - Local queue mode prefetches jobs with
batch_get_jobsand wakes worker tasks through an internal signal when cached jobs are available.
Both modes end in the same execution and release logic.
Running a Task
After a job is fetched, the worker looks up the registered handler by the job's task identifier. If no identifier or function is found, the job is released through the failure path.
For a valid job, the worker creates a WorkerContext containing the shared
job, database handle, schema, worker id, extensions, task details, and time
mode. It then runs the handler future:
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
// use ctx and the deserialized payload here
Ok::<(), String>(())
}
}
The execution path records task duration for successful jobs, catches panics, and watches the worker shutdown signal. If shutdown is requested, the worker waits for the configured grace period. A task that has not finished by then is treated as aborted and goes through the recovery release path.
Completion and Failure
Successful jobs are completed by deleting the job row. If the job belongs to a
queue, completion also clears locked_by and locked_at on the queue row, but
only when that queue is locked by the same worker id.
Failed jobs are updated instead of deleted:
last_erroris set to the persisted error message.run_atis moved forward by exponential backoff based onattempts.locked_byandlocked_atare cleared.- a replacement payload may be written when the handler returned one.
- queued jobs also unlock their queue row.
The worker decides whether a failure will retry by comparing attempts with
max_attempts. Retryable failures emit job-fail hooks; exhausted jobs emit
permanent-failure hooks. The SQL update is the same shape: a permanently failed
job remains unlocked with its last error recorded and is no longer available
once its attempt limit is reached by the schema availability rules.
Completion and ordinary failures can be batched. The completion and failure batchers receive release requests through bounded channels, flush them after the configured delay, and emit hooks only for rows that were actually persisted. If a batcher is already closed, the worker falls back to direct persistence for that job.
Shutdown and Recovery
Shutdown aborts are intentionally not normal task failures. When a running job is interrupted by the shutdown grace-period timeout, the worker applies job recovery instead of failure backoff:
attemptsis decremented with a floor of zero.locked_byandlocked_atare cleared.run_atis delayed byinterrupted_job_retry_delaywhen configured.- queued jobs also unlock their queue row.
- interruption hooks are emitted when recovery handled the job.
Automatic worker recovery is separate and opt-in. When enabled, workers record heartbeats, and the recovery sweeper finds stale worker ids or orphaned locks. Recovered jobs are returned to the queue through the same recovery semantics: the attempt consumed by fetching the job is reversed, locks are cleared, and a recovery delay can be applied before the job is visible again.
The recovery SQL layer can also fetch locked jobs for a set of worker ids and
call the schema's recover_dead_worker_jobs function to recover jobs owned by
dead workers.
Workspace Layout
The workspace keeps behavior separated by responsibility:
src/
lib.rs crate exports and top-level module structure
runner/ worker lifecycle, job loop, execution, release
batcher/ completion and failure batching
streams/ job signal and job fetch streams
crates/
graphile-worker-queries/
src/add_job/ SQL wrappers for inserting jobs
src/get_job.rs single-job locking fetch
src/batch_get_jobs.rs
batched locking fetch for LocalQueue
src/complete_job.rs completion persistence
src/fail_job/ failure persistence
src/return_jobs/ recovery and interrupted-job return
src/recover_workers.rs
stale-worker recovery queries
This layout keeps most PostgreSQL statements in
graphile-worker-queries, while src/runner owns the orchestration decisions:
when to fetch, how to execute handlers, which release path to use, and when to
flush background components.
Tasks and Payloads
Tasks are the typed boundary between queued JSON jobs and your Rust code. A task
is a Rust payload type that implements TaskHandler; Graphile Worker RS uses the
task identifier stored with the job to select the handler, deserializes the job
payload into that type, and then calls run.
TaskHandler
A normal task handler needs four pieces:
- a payload type that implements
SerializeandDeserialize - a globally unique
IDENTIFIER - an async
runmethod - a registered job definition on the worker
use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SendEmail {
to: String,
subject: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Sending email to {} with subject: {}", self.to, self.subject);
Ok::<(), String>(())
}
}
run receives self as the deserialized payload. Returning () marks the job as
complete. Returning Result<(), E> marks Ok(()) as complete and Err(error) as
failed, using the error's debug representation.
Register the handler before running the worker:
let worker = WorkerOptions::default()
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
For reusable modules, expose definitions instead of making callers know every type:
use graphile_worker::{JobDefinition, TaskHandler};
pub fn jobs() -> [JobDefinition; 2] {
[SendEmail::definition(), SendDailyReport::definition()]
}
Identifiers
IDENTIFIER is the string stored with the job. It must be unique across all task
types registered by your application, because the worker uses it to match queued
jobs to handlers.
Prefer stable, descriptive, lowercase names:
impl TaskHandler for ProcessPayment {
const IDENTIFIER: &'static str = "process_payment";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
Ok::<(), String>(())
}
}
When adding typed jobs, the identifier comes from the task type:
helpers
.add_job(
SendEmail {
to: "alice@example.com".into(),
subject: "Welcome!".into(),
},
JobSpecBuilder::new().build(),
)
.await?;
When adding raw jobs, you provide the identifier and JSON payload yourself. This is useful for heterogeneous batches, but it skips the compile-time connection between payload type and handler:
utils
.add_raw_jobs(&[
RawJobSpec {
identifier: "send_email".into(),
payload: serde_json::json!({
"to": "dave@example.com",
"subject": "Notification"
}),
spec: JobSpec::default(),
},
RawJobSpec {
identifier: "process_payment".into(),
payload: serde_json::json!({ "user_id": 123, "amount": 50 }),
spec: JobSpec::default(),
},
])
.await?;
Payload Serialization
Task payloads are serialized to JSON when jobs are added and deserialized from JSON when jobs run. The implementing type is the payload shape:
#[derive(Deserialize, Serialize)]
struct SayHello {
message: String,
}
If deserialization fails, the task is treated as failed and the deserialization
error is reported as the failure reason. Keep payload structs explicit and avoid
depending on data that is not in the job payload unless it comes from
WorkerContext.
Batch insertion can still be type-safe when every job has the same task type:
let spec = JobSpec::default();
let emails = vec![
(
SendEmail {
to: "alice@example.com".into(),
subject: "Welcome!".into(),
},
&spec,
),
(
SendEmail {
to: "bob@example.com".into(),
subject: "Welcome!".into(),
},
&spec,
),
];
utils.add_jobs::<SendEmail>(&emails).await?;
Use job specs for scheduling, priority, and other queue behavior. See Scheduling and Queues for the surrounding job controls.
Batch Handlers
BatchTaskHandler is for one database job whose payload is a JSON array of item
payloads. Self is the item type, not Vec<Self>.
use graphile_worker::{
BatchTaskHandler, IntoBatchTaskHandlerResult, JobSpecBuilder, WorkerContext,
};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct PendingNotification {
user_id: String,
message_id: String,
}
impl BatchTaskHandler for PendingNotification {
const IDENTIFIER: &'static str = "send_notifications";
async fn run_batch(
items: Vec<Self>,
_ctx: WorkerContext,
) -> impl IntoBatchTaskHandlerResult {
let mut results = Vec::with_capacity(items.len());
for item in items {
results.push(
send_notification(item)
.await
.map_err(|error| error.to_string()),
);
}
results
}
}
Register batch handlers with define_batch_job and add batch jobs with
add_batch_job:
let worker = WorkerOptions::default()
.define_batch_job::<PendingNotification>()
.pg_pool(pg_pool)
.init()
.await?;
worker
.create_utils()
.add_batch_job(
vec![
PendingNotification {
user_id: "1".into(),
message_id: "a".into(),
},
PendingNotification {
user_id: "1".into(),
message_id: "b".into(),
},
],
JobSpecBuilder::new().build(),
)
.await?;
A batch handler can return:
()for complete successResult<(), E>to complete or fail the whole batchVec<Result<(), E>>for per-item resultsBatchTaskResult<E>directly
For per-item results, the vector must have the same length and order as the input items. Failed positions are retried with their original payload values; successful positions are removed from the retry payload. If the stored payload is not a JSON array, or the result count does not match the item count, the batch job fails.
WorkerContext
Both TaskHandler::run and BatchTaskHandler::run_batch receive a
WorkerContext. Use the payload fields on self for job input, and use the
context for worker-provided data such as job metadata and registered extensions.
#[derive(Clone, Debug)]
struct AppState {
api_key: String,
}
impl TaskHandler for ProcessUserTask {
const IDENTIFIER: &'static str = "process_user";
async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
let app_state = ctx.get_ext::<AppState>().unwrap();
call_service(&app_state.api_key, &self.user_id).await?;
Ok::<(), String>(())
}
}
Keep durable job input in the payload. Keep process-local resources, clients,
counters, and configuration in extensions reached through WorkerContext.
Versioning Payloads
Queued jobs may outlive the code version that created them. Treat each payload as a stored API contract:
- keep identifiers stable unless you intentionally migrate producers and workers
- add optional fields when older queued jobs may not contain new data
- prefer explicit field names over positional or loosely shaped JSON
- avoid removing or renaming required fields while old jobs can still run
- use a new identifier when the meaning of a task changes incompatibly
These rules matter for normal jobs and batch jobs. Batch jobs store an array of payload items, so each item shape needs the same compatibility care as a normal task payload.
Worker Lifecycle
A worker is built with WorkerOptions, initialized with init(), and then
started with either run() or run_once(). The builder collects configuration;
init() turns that configuration into a ready Worker.
let worker = graphile_worker::WorkerOptions::default()
.concurrency(5)
.schema("graphile_worker")
.database_url("postgres://postgres:password@localhost/mydb")
.define_job::<SendEmail>()
.init()
.await?;
worker.run().await?;
Worker::options() is an equivalent entry point if you prefer to start from the
worker type:
let worker = graphile_worker::Worker::options()
.database_url("postgres://postgres:password@localhost/mydb")
.define_job::<SendEmail>()
.init()
.await?;
Initialization
WorkerOptions::init() performs the setup work that must happen before jobs can
be processed:
- uses the supplied database connection, or creates one from
database_url - runs the database migrations for the configured schema
- registers the configured task identifiers in the database
- creates a random worker id with the
graphile_worker_prefix - applies defaults such as a one second poll interval and CPU-count concurrency
- prepares shutdown, recovery, hook, queue, and batcher state
init() returns an error if the worker cannot connect to the database, no
database URL or pool was supplied, migrations fail, task registration fails, or
local queue configuration is invalid.
Database Configuration
A worker needs a PostgreSQL database before init() can succeed. You can pass a
URL and let Graphile Worker RS create the pool:
let worker = graphile_worker::WorkerOptions::default()
.database_url("postgres://postgres:password@localhost/mydb")
.max_pg_conn(20)
.define_job::<SendEmail>()
.init()
.await?;
Or you can pass an existing database connection. With the SQLx driver, pg_pool
is a convenience wrapper:
let pg_pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect("postgres://postgres:password@localhost/mydb")
.await?;
let worker = graphile_worker::WorkerOptions::default()
.pg_pool(pg_pool)
.define_job::<SendEmail>()
.init()
.await?;
If both an existing database connection and a URL are provided, the existing
connection is used. max_pg_conn only applies when the worker creates a pool
from database_url.
The schema defaults to graphile_worker. Set a custom schema when you want to
isolate worker tables:
let worker = graphile_worker::WorkerOptions::default()
.schema("my_app_worker")
.database_url("postgres://postgres:password@localhost/mydb")
.define_job::<SendEmail>()
.init()
.await?;
Task Registration
Workers only run tasks registered on the builder. The common path is
define_job::<T>(), where T implements TaskHandler:
use graphile_worker::{
IntoTaskHandlerResult, TaskHandler, WorkerContext, WorkerOptions,
};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SendEmail {
to: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Sending email to {}", self.to);
}
}
let worker = WorkerOptions::default()
.define_job::<SendEmail>()
.database_url("postgres://postgres:password@localhost/mydb")
.init()
.await?;
For reusable modules, collect definitions and register them with define_jobs.
Batch handlers can be registered with define_batch_job.
Jobs with forbidden flags are skipped by that worker:
let worker = graphile_worker::WorkerOptions::default()
.add_forbidden_flag("high_memory")
.define_job::<SendEmail>()
.database_url("postgres://postgres:password@localhost/mydb")
.init()
.await?;
Concurrency and Polling
concurrency controls the maximum number of jobs the worker processes at the
same time. If it is not set, the default is the number of logical CPUs. Passing
0 panics.
let worker = graphile_worker::WorkerOptions::default()
.concurrency(10)
.poll_interval(std::time::Duration::from_millis(500))
.database_url("postgres://postgres:password@localhost/mydb")
.define_job::<SendEmail>()
.init()
.await?;
poll_interval controls how often the worker checks PostgreSQL for work when
notification delivery is not enough, or when jobs are scheduled for the future.
The default is one second.
Running Continuously
Worker::run() is the normal long-running mode for application workers. It:
- registers the worker for recovery bookkeeping
- emits the worker start hook
- starts the crontab scheduler and the job runner together
- runs until its shutdown signal resolves or an error stops the runner
- waits for completion and failure batchers to shut down
- stops recovery tasks
- emits the worker shutdown hook
- deregisters the worker
graphile_worker::WorkerOptions::default()
.concurrency(5)
.database_url("postgres://postgres:password@localhost/mydb")
.define_job::<SendEmail>()
.init()
.await?
.run()
.await?;
Use run() for service processes, web application sidecars, and deployments
where the worker should keep listening for new jobs.
Running Once
Worker::run_once() processes currently available jobs and then returns. It
uses the same configured concurrency and task handlers, but it does not start
the continuous lifecycle used by run().
This is useful for scripts, tests, local maintenance commands, and one-shot workers:
let worker = graphile_worker::WorkerOptions::default()
.concurrency(2)
.schema("example_simple_worker")
.define_job::<SayHello>()
.pg_pool(pg_pool)
.init()
.await?;
let helpers = worker.create_utils();
for i in 0..10 {
helpers
.add_job(
SayHello {
message: format!("world {}", i),
},
graphile_worker::JobSpec::default(),
)
.await?;
}
worker.run_once().await?;
When a run-once job belongs to a queue, the worker checks for another job after releasing it so queued work can continue in order.
Shutdown
Each worker has an internal shutdown signal. By default, Graphile Worker RS also
listens for OS shutdown signals such as SIGINT and SIGTERM. You can request
shutdown from application code:
worker.request_shutdown();
If your host application already owns shutdown handling, pass a custom signal and disable the built-in OS listeners:
let worker = graphile_worker::WorkerOptions::default()
.listen_os_shutdown_signals(false)
.shutdown_signal(async {
wait_for_application_shutdown().await;
})
.shutdown_grace_period(std::time::Duration::from_secs(10))
.shutdown_interrupted_job_retry_delay(std::time::Duration::from_secs(30))
.database_url("postgres://postgres:password@localhost/mydb")
.define_job::<SendEmail>()
.init()
.await?;
worker.run().await?;
The same settings can be supplied as a WorkerShutdownConfig with
worker_shutdown(config).
Dropping a Worker also notifies the internal shutdown signal as a safety net,
but normal applications should shut down explicitly through the configured
signal or request_shutdown().
Worker IDs and the Node.js Version
Graphile Worker RS is compatible with the original Node.js Graphile Worker for
the queue schema, but worker identity is different. In the Node.js version, each
process has its own worker_id. In the Rust version, init() creates one
worker id for the Worker instance, and jobs are processed concurrently by the
enabled async runtime using that same worker id.
Scheduling Model
Graphile Worker RS schedules work by inserting rows into PostgreSQL. A job
becomes runnable when it is available, unlocked, below its retry limit, and its
run_at timestamp is due.
Most Rust applications schedule jobs through WorkerUtils:
let utils = worker.create_utils();
utils.add_job(
SendEmail {
to: "user@example.com".to_string(),
subject: "Welcome".to_string(),
body: "Thanks for signing up.".to_string(),
},
JobSpec::default(),
).await?;
Use add_raw_job when the task identifier is only known at runtime:
utils.add_raw_job(
"send_email",
serde_json::json!({
"to": "user@example.com",
"subject": "Welcome"
}),
JobSpec::default(),
).await?;
JobSpec
JobSpec contains the scheduling metadata sent to the database:
use chrono::Utc;
use graphile_worker::{JobKeyMode, JobSpecBuilder};
let spec = JobSpecBuilder::new()
.queue_name("user:123")
.run_at(Utc::now() + chrono::Duration::minutes(5))
.max_attempts(5)
.job_key("welcome-email:123")
.job_key_mode(JobKeyMode::Replace)
.priority(-10)
.flags(vec!["email".to_string()])
.build();
The database fills omitted values:
| Field | Meaning | Default |
|---|---|---|
queue_name | Optional queue used to serialize related jobs. | No queue |
run_at | Earliest timestamp at which the job may run. | now() |
max_attempts | Maximum attempts before the job is permanently unavailable. | 25 |
job_key | Optional deduplication key. | No key |
job_key_mode | How an existing keyed job is handled. | Replace |
priority | Sort key for runnable jobs. Lower numbers run first. | 0 |
flags | Optional flags stored on the job. | No flags |
When WorkerUtils is configured with with_use_local_time(true), Rust passes
the application's current UTC time for omitted run_at values. Otherwise the
database uses PostgreSQL now().
Immediate And Future Jobs
JobSpec::default() schedules a job immediately. To delay a job, set run_at:
use chrono::Utc;
use graphile_worker::JobSpecBuilder;
let spec = JobSpecBuilder::new()
.run_at(Utc::now() + chrono::Duration::hours(1))
.build();
utils.add_job(SendEmail { /* ... */ }, spec).await?;
Workers only fetch jobs whose run_at is less than or equal to the current
time used by the worker query.
Priority
Runnable jobs are selected in ascending priority order, then ascending run_at.
The default priority is 0, so negative values run before default-priority jobs
and positive values run later.
let urgent = JobSpecBuilder::new()
.priority(-10)
.build();
let normal = JobSpec::default();
Priority is not a separate queue. It only affects ordering among jobs that are otherwise available to the worker.
Job Keys
A job_key deduplicates jobs. Adding another available job with the same key
updates the existing row instead of creating another row.
use graphile_worker::{JobKeyMode, JobSpecBuilder};
let spec = JobSpecBuilder::new()
.job_key("sync-user:123")
.job_key_mode(JobKeyMode::Replace)
.build();
utils.add_job(SyncUser { user_id: 123 }, spec).await?;
The supported JobKeyMode values are:
| Mode | Behavior |
|---|---|
Replace | Replace the existing available job's queue, task, payload, run_at, max_attempts, priority, and flags. Reset attempts and clear the last error. |
PreserveRunAt | Replace the job, but keep the existing run_at when the existing job has not been attempted yet. |
UnsafeDedupe | If the key already exists, only increment the revision and update updated_at; the existing payload and timing are left in place. |
For a keyed job that already exists but is no longer available, the database clears the old key and marks that old row as fully attempted before inserting the replacement.
Batch scheduling supports Replace and PreserveRunAt. UnsafeDedupe is
rejected for add_jobs and add_raw_jobs. In a batch, if any job uses
PreserveRunAt, the preserve-run-at behavior is applied to keyed conflicts in
that batch.
Queues
queue_name groups jobs that must not run in parallel. Jobs with the same queue
share a row in the private job queue table, and the worker locks that queue when
it fetches a queued job.
let spec = JobSpecBuilder::new()
.queue_name("account:123")
.build();
utils.add_job(UpdateAccount { id: 123 }, spec).await?;
Use queues for per-user, per-account, or per-resource workflows where ordering or mutual exclusion matters. Jobs without a queue are not serialized by this mechanism.
Batch Scheduling
Use add_jobs or add_raw_jobs to insert many jobs in one database round trip:
let spec = JobSpec::default();
utils.add_jobs::<SendEmail>(&[
(SendEmail { /* ... */ }, &spec),
(SendEmail { /* ... */ }, &spec),
]).await?;
For batch task handlers, add_batch_job stores a JSON array in a single job.
When a keyed batch job is replaced and both the existing and new payloads are
arrays, the database appends the new array to the existing array.
Cron
Cron schedules are configured on WorkerOptions, not by manually looping in
application code. The typed API builds cron definitions in Rust:
use graphile_worker::{Cron, CrontabFill, WorkerOptions};
let worker = WorkerOptions::default()
.define_job::<SendDailyReport>()
.with_cron(
Cron::daily_at::<SendDailyReport>(8, 0)?
.fill(CrontabFill::hours(1)),
)
.init()
.await?;
Crontab text is also supported:
let worker = WorkerOptions::default()
.define_job::<SendDailyReport>()
.with_cron("0 8 * * * send_daily_report")?
.init()
.await?;
On each cron tick, the runner checks which crontabs match the current local
timestamp and schedules matching jobs through the same database add_job
function used by one-off jobs. Cron jobs can carry payload, queue, run_at,
max_attempts, priority, job key, and job key mode metadata from the crontab
definition. If the payload is a JSON object, the runner adds a _cron object
with the scheduled timestamp and whether the job was backfilled.
The runner records known crontabs and their last execution timestamp so a cron tick is only scheduled once for a given crontab execution.
Retry Metadata
The worker increments attempts when it locks a job to run it. A job remains
available only while attempts < max_attempts.
When a job fails, the failure query stores last_error, unlocks the job, and
sets a future run_at using exponential backoff:
greatest(now(), run_at) + exp(least(attempts, 10)) seconds
This means retry delay grows with the attempt count and is capped by using at
most attempt 10 in the exponent. Once attempts reaches max_attempts, the
job is no longer available for normal fetching.
Existing jobs can be changed with management utilities such as rescheduling.
RescheduleJobOptions can update run_at, priority, attempts, and
max_attempts for selected job ids.
Queues and Concurrency
Graphile Worker RS uses PostgreSQL as the source of truth for runnable jobs, and worker concurrency decides how many jobs a worker may execute at the same time. Queues add another layer of control: they let you group jobs by workload and serialize jobs that share the same queue name.
Use queues when one class of work should not interfere with another, or when a sequence of jobs must not run in parallel.
Assigning Jobs to Queues
Jobs can be assigned to a queue with JobSpec::queue_name. The field is
optional; when it is not set, the job is added without an explicit queue name.
use graphile_worker::JobSpecBuilder;
worker
.create_utils()
.add_job(
SendEmail {
to: "user@example.com".to_string(),
},
JobSpecBuilder::new()
.queue_name("mail")
.build(),
)
.await?;
The same option is available when building JobSpec directly:
use graphile_worker::JobSpec;
let spec = JobSpec {
queue_name: Some("exports".to_string()),
..Default::default()
};
Queue names are data, not task identifiers. Different task handlers can share a queue when they must be serialized together, and the same task handler can use different queues for different tenants, accounts, or workload classes.
Serial Queues
A queue is useful when jobs for the same resource must run one at a time. For example, jobs that update the same external account can all use a queue derived from that account id:
let queue_name = format!("account:{}", account_id);
worker
.create_utils()
.add_job(
SyncAccount { account_id },
JobSpecBuilder::new()
.queue_name(queue_name)
.build(),
)
.await?;
While a queued job is in progress, Graphile Worker RS records the queue lock in
the database. Tests assert that a named queue has a locked_at timestamp,
locked_by worker id, and a job count while its job is running. When the job
finishes, the completed job is removed and the queue can be used by later work.
This makes queue names a practical serialization primitive:
- Use one queue name for all jobs that must run in order.
- Use distinct queue names for jobs that may run at the same time.
- Keep queue names stable and deterministic when they represent a shared resource.
Worker Concurrency
Worker concurrency controls how many jobs a worker can execute at once.
let worker = graphile_worker::WorkerOptions::default()
.concurrency(5)
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
With higher concurrency, independent queues can run in parallel. The
concurrency tests enqueue five jobs into five different queues and configure
the worker with concurrency(10); all five jobs are picked up and left
in progress at the same time.
Without increasing concurrency, work is effectively drained one job at a time in
the tested run_once path. That is useful for small deployments or jobs that
should not overlap, but it means one slow job can delay unrelated work.
Workload Isolation
Queues and concurrency solve different parts of workload isolation.
Use queue names to protect resources:
let spec = JobSpecBuilder::new()
.queue_name(format!("project:{}", project_id))
.build();
Use worker concurrency to decide how many independent jobs the process can run:
let worker = graphile_worker::WorkerOptions::default()
.concurrency(8)
.define_job::<RenderPreview>()
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
Common patterns:
- Per-tenant queues:
tenant:42 - Per-project queues:
project:abc123 - Per-external-resource queues:
stripe:acct_123 - Shared workload queues:
mail,exports,webhooks
Choose the narrowest queue name that protects the resource you care about. A single global queue is simple, but it serializes everything behind the slowest job in that queue. Very fine-grained queue names allow more parallelism, but they only protect resources that are named consistently.
Local Queue
The local queue is an in-worker cache of jobs fetched from PostgreSQL. It exists to reduce polling and provide low-latency processing while the database remains the durable source of truth.
use graphile_worker::LocalQueueConfig;
let worker = graphile_worker::WorkerOptions::default()
.concurrency(3)
.local_queue(LocalQueueConfig::builder().size(10).build())
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
LocalQueueConfig includes:
size: maximum number of jobs each local queue may fetch and hold at once. The default is100.ttl: how long locally fetched jobs may stay unclaimed before being returned to the database. The default is five minutes.refetch_delay: an optional delay strategy used when a fetch returns fewer jobs than requested.queue_count: number of independent local queues to run inside this worker. The default is1.
When queue_count is greater than one, size applies to each local queue. For
example, size = 3 and queue_count = 4 allow up to twelve jobs to be locked
locally across the worker. Tests also assert that queue_count is capped by
worker concurrency, because each local queue needs at least one worker draining
it.
let local_queue = LocalQueueConfig::default()
.with_size(3)
.with_queue_count(4);
let worker = graphile_worker::WorkerOptions::default()
.concurrency(5)
.local_queue(local_queue)
.define_job::<SmallFastJob>()
.pg_pool(pg_pool)
.init()
.await?;
Multiple local queues can improve throughput for very small, high-volume jobs
by allowing several fetch batches in parallel. They can also lock more jobs
inside one worker, so keep size lower when increasing queue_count and
benchmark with realistic workloads.
Practical Guidance
Start with worker concurrency and explicit queue names before tuning local queue internals.
- Increase
concurrencywhen unrelated jobs should run in parallel and the database pool, runtime, and downstream services can handle the extra work. - Add
queue_namewhen jobs for the same tenant, project, account, or external system must not overlap. - Use separate queue names for unrelated workloads so slow serial work does not block everything else.
- Enable and tune
local_queuewhen polling overhead or very small jobs become a bottleneck. - Raise
queue_countonly after measuring; it increases parallel fetch capacity and the number of jobs a worker can hold locally.
Database Schema
Graphile Worker RS stores jobs and coordination state in PostgreSQL. By
default it uses the graphile_worker schema, and migrations create the tables,
types, indexes, functions, and views that the worker runtime needs.
Treat this schema as owned by Graphile Worker RS. Application code should add jobs through the Rust API or the installed SQL functions instead of writing directly to private tables.
Schema Name
The default schema name is graphile_worker. Migration SQL is written with a
schema placeholder and executed against the configured schema, so installations
can use another schema name when the worker is configured that way.
For example, the migration executor replaces the schema placeholder before it runs each SQL statement:
:GRAPHILE_WORKER_SCHEMA
Most applications can leave the default schema in place.
Installed Objects
Current migrations install and maintain these main storage tables:
| Object | Purpose |
|---|---|
_private_jobs | Stores queued jobs, scheduling fields, attempts, locks, flags, keys, and errors. |
_private_tasks | Stores task identifiers and maps them to internal task ids. |
_private_job_queues | Stores named queues and queue lock state for serialized queue execution. |
_private_workers | Stores worker heartbeat metadata used by stale-worker recovery. |
migrations | Records applied migration ids and whether each migration is breaking. |
The worker also installs SQL functions used by the Rust implementation, including job insertion, completion, failure/rescheduling operations, worker heartbeat, stale-worker listing, and dead-worker recovery functions.
The exact private table layout can change across migrations. Prefer the documented APIs and the installed functions for integration points.
Public Jobs View
The migrations expose a jobs view over the private tables. It joins jobs to
their task identifier and queue name so operators can inspect queue state
without depending on every private table detail.
SELECT
id,
queue_name,
task_identifier,
priority,
run_at,
attempts,
max_attempts,
last_error,
created_at,
updated_at,
key,
locked_at,
locked_by,
revision,
flags
FROM graphile_worker.jobs
ORDER BY priority, run_at, id;
Use this for diagnostics and operational visibility. Avoid updating through the view or relying on it as an application-owned table.
Jobs
A job row tracks the work to perform and the state needed to schedule it:
- the task identifier, stored through
_private_tasks; - the JSON payload;
- optional queue membership, stored through
_private_job_queues; run_at,priority,attempts, andmax_attempts;- optional
keyandflags; - lock fields, error text, revision, and timestamps.
Adding jobs through the worker APIs keeps these related records consistent.
The SQL migrations also install add_job and add_jobs functions, which create
task and queue rows as needed and notify workers that jobs were inserted.
use graphile_worker::{JobSpec, WorkerUtils};
use serde_json::json;
let worker_utils = WorkerUtils::new(database, "graphile_worker");
worker_utils
.add_raw_job(
"send_email",
json!({ "user_id": 42 }),
JobSpec::default(),
)
.await?;
Queues
Jobs may belong to a named queue. Queue rows carry lock state so the worker can serialize jobs within the same named queue while still allowing unrelated queues or unqueued jobs to make progress.
Queue names are an internal scheduling primitive, not a separate application
model. Let add_job or add_jobs create queue rows as needed.
Workers
Workers register heartbeat state in _private_workers. The installed functions
include:
worker_heartbeat, which inserts or updates a worker heartbeat and optional metadata;worker_deregister, which removes a worker row;list_stale_workersandlist_orphan_locked_workers, which identify workers or locks that need recovery;recover_dead_worker_jobs, which clears locks for selected workers and reschedules affected jobs;delete_stale_workers, which removes stale worker rows.
These functions are part of the worker recovery machinery. Application code normally configures and runs workers rather than calling them directly.
Migrations
Graphile Worker RS ships its migration SQL with the crate and records applied
migrations in the schema's migrations table. The current migration registry
contains 20 migrations.
Migrations are designed to be repeatable after they have been applied: running
migration again leaves existing jobs in place. The migration tests also cover
taking over from a pre-existing migrations table.
Some migrations are marked as breaking in the migration metadata. The current breaking migration ids are:
1, 3, 11, 13, 14, 16
Migration 11 is intentionally cautious around locked jobs. If jobs are locked when that migration runs, migration fails with a dedicated locked-job error instead of silently changing active job state.
Compatibility
The worker checks migration revision compatibility after applying migrations. If the database contains a breaking migration id newer than the worker knows about, migration aborts instead of letting an older binary run against a future schema. If the newer recorded migration is not marked as breaking, the worker logs a compatibility warning and continues.
That check protects private schema compatibility. It also means deployment order matters when multiple services share the same worker schema: avoid rolling back to a binary that cannot understand the breaking schema version already installed in the database.
For operational SQL, keep queries conservative:
-- Good for inspection
SELECT id, task_identifier, run_at, attempts, max_attempts, last_error
FROM graphile_worker.jobs
WHERE attempts >= max_attempts
ORDER BY updated_at DESC;
Avoid SQL that inserts, updates, deletes, or assumes private column names in
_private_* tables unless you are working on Graphile Worker RS itself.
Configuration
Graphile Worker RS is configured through WorkerOptions. Start with the few
choices that define the worker's role, then add specialized options only when
the deployment needs them.
Most applications need to answer these questions:
- Which runtime, TLS backend, and database driver should the crate compile with?
- Which PostgreSQL connection and schema should this worker use?
- Which task handlers is this process responsible for?
- How much work should this process run at once?
- Does the application need cron, lifecycle hooks, application state, local queueing, recovery, or custom shutdown behavior?
Minimal Worker
A worker needs a database connection, at least one task definition for the jobs
it should process, and a call to init().
use graphile_worker::WorkerOptions;
let worker = WorkerOptions::default()
.database_url("postgres://user:password@localhost/mydb")
.define_job::<SendEmail>()
.init()
.await?;
worker.run().await?;
init() establishes or reuses the database connection, runs migrations for the
configured schema, registers task details, creates a worker id, and returns a
configured worker.
Decision Map
Use this map to find the option family that matches the decision you are making.
| Decision | Use | Notes |
|---|---|---|
| Compile for Tokio or async-std | Cargo features | Default features enable Tokio, rustls, and SQLx. See Runtime, TLS, and Drivers. |
| Connect with an existing pool | database(...) or pg_pool(...) | An explicit database connection takes precedence over database_url(...). |
| Connect from a URL | database_url(...) and max_pg_conn(...) | max_pg_conn(...) applies only when the worker creates its own pool from a URL. |
| Isolate worker tables | schema(...) | Defaults to the Graphile Worker schema when not set. |
| Register handlers | define_job(...), define_batch_job(...), define_jobs(...) | Use define_jobs(...) when a module exposes reusable job definitions. |
| Limit concurrent execution | concurrency(...) | Defaults to the number of logical CPUs and must be greater than zero. |
| Tune database polling | poll_interval(...) | Defaults to one second. Notifications still provide low-latency wakeups when available. |
| Skip flagged jobs | add_forbidden_flag(...) | Workers with forbidden flags bypass local queueing and fetch directly from the database. |
| Add recurring jobs | with_cron(...) or with_crons(...) | Accepts typed cron values, raw crontabs, or crontab text depending on input. See Cron Jobs. |
| Share application state | add_extension(...) | Extensions are available from task contexts. See Application State and Extensions. |
| Observe or intercept lifecycle events | on(...) or add_plugin(...) | See Lifecycle Hooks. |
| Improve throughput with local buffering | local_queue(...) | Batch-fetches jobs into a local cache. See Local Queue. |
| Batch completion or permanent failure writes | complete_job_batch_delay(...), fail_job_batch_delay(...) | Small delays such as 1-5ms are recommended by the API docs. |
| Recover jobs locked by dead workers | worker_recovery(...) or recovery convenience setters | Recovery is opt-in. See Worker Recovery. |
| Integrate with host shutdown | worker_shutdown(...) or shutdown convenience setters | See Shutdown. |
For a complete option reference, see Worker Options.
Database Configuration
Pass a database connection directly when your application already owns the pool:
let worker = WorkerOptions::default()
.pg_pool(pg_pool)
.schema("graphile_worker")
.define_job::<SendEmail>()
.init()
.await?;
Or let the worker create its own connection pool from a PostgreSQL URL:
let worker = WorkerOptions::default()
.database_url("postgres://user:password@localhost/mydb")
.max_pg_conn(20)
.define_job::<SendEmail>()
.init()
.await?;
Use schema(...) when you want Graphile Worker tables in a specific PostgreSQL
schema, for example to separate environments or independent worker systems in
the same database.
Runtime And Driver Features
The default crate features enable:
graphile_worker = { version = "0.13", features = ["tls-rustls"] }
That default path uses Tokio, rustls, and the SQLx driver. To use async-std, disable default features and enable the async-std runtime explicitly:
graphile_worker = { version = "0.13", default-features = false, features = [
"runtime-async-std",
"tls-rustls",
"driver-sqlx",
] }
Feature choices are compile-time configuration, not WorkerOptions methods.
See Runtime, TLS, and Drivers and
Feature Flags before changing them.
Worker Role
A process can register one task, a batch task, or a set of reusable job definitions:
let worker = WorkerOptions::default()
.define_job::<SendEmail>()
.define_batch_job::<ImportContacts>()
.define_jobs(reporting_jobs())
.add_forbidden_flag("high_memory")
.init()
.await?;
Use add_forbidden_flag(...) for specialized workers that must refuse jobs with
certain flags. This is a filtering rule for the worker process; it does not
register a handler by itself.
Throughput And Latency
The primary knobs are concurrency(...), poll_interval(...), local queueing,
and optional batched writes.
use std::time::Duration;
let worker = WorkerOptions::default()
.concurrency(10)
.poll_interval(Duration::from_millis(500))
.complete_job_batch_delay(Duration::from_millis(5))
.fail_job_batch_delay(Duration::from_millis(5))
.define_job::<SendEmail>()
.init()
.await?;
concurrency(...) controls how many jobs can run at the same time.
poll_interval(...) controls fallback polling and future scheduled job checks.
For higher-throughput workloads, Local Queue can
batch-fetch jobs from PostgreSQL and distribute them locally.
Scheduling
Use with_cron(...) for recurring jobs. Text input returns a result because it
can fail to parse:
let worker = WorkerOptions::default()
.define_job::<SendDigest>()
.with_cron("0 8 * * * send_digest")?
.init()
.await?;
Typed cron builders and raw crontab values can also be added through
with_cron(...) or with_crons(...). See Cron Jobs for
the scheduling guide and Scheduling Model for the
underlying concepts.
Application Integration
Use extensions for shared application state, hooks for lifecycle behavior, and shutdown settings when the host application already owns process signals:
use std::time::Duration;
let worker = WorkerOptions::default()
.add_extension(app_config)
.listen_os_shutdown_signals(false)
.shutdown_signal(host_shutdown())
.shutdown_grace_period(Duration::from_secs(5))
.define_job::<SendEmail>()
.init()
.await?;
Shutdown configuration controls when the worker starts graceful shutdown, how long in-flight jobs may continue, and when shutdown-aborted jobs should be made eligible for retry. See Shutdown.
Recovery
Worker recovery is disabled by default. Enable it when your deployment needs jobs locked by crashed or unreachable workers to be released automatically:
use std::time::Duration;
let worker = WorkerOptions::default()
.heartbeat_interval(Duration::from_secs(30))
.sweep_interval(Duration::from_secs(60))
.sweep_threshold(Duration::from_secs(300))
.recovery_delay(Duration::from_secs(30))
.define_job::<SendEmail>()
.init()
.await?;
The recovery convenience setters enable recovery while setting their specific
interval or delay. Use worker_recovery(...) when you want to build and pass a
full recovery configuration object. See Worker Recovery.
Worker Options
WorkerOptions is the main builder used to configure a worker before calling
init(). The builder collects database settings, registered task handlers,
scheduling rules, hooks, and performance options, then init() connects to
PostgreSQL, runs migrations, registers task details, and returns a runnable
worker.
Most applications start with WorkerOptions::default(), chain the options they
need, then call init().await? followed by run().await?.
use graphile_worker::WorkerOptions;
let worker = WorkerOptions::default()
.concurrency(5)
.schema("graphile_worker")
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
worker.run().await?;
Database configuration
Every worker needs a database connection before init() can succeed. You can
either pass an existing connection pool or let the worker create one from a
database URL.
let worker = WorkerOptions::default()
.database_url("postgres://user:password@localhost/mydb")
.max_pg_conn(20)
.define_job::<SendEmail>()
.init()
.await?;
When you already have a SQLx pool, pass it directly:
let worker = WorkerOptions::default()
.pg_pool(pg_pool)
.define_job::<SendEmail>()
.init()
.await?;
Important database methods:
database(value)uses an existing driver-agnostic database connection.pg_pool(pool)is the SQLx convenience wrapper for passing an existingsqlx::PgPool.database_url(url)stores the PostgreSQL URL used when no existing connection was provided.max_pg_conn(count)sets the maximum pool size when the worker creates a pool fromdatabase_url; the default is20.
If both an existing connection and database_url are provided, the existing
connection takes precedence.
Core runtime settings
Use the core options to control where worker tables live and how aggressively the worker processes jobs.
use std::time::Duration;
let worker = WorkerOptions::default()
.schema("my_app_worker")
.concurrency(10)
.poll_interval(Duration::from_millis(500))
.use_notification_delivery(true)
.use_local_time(false)
.pg_pool(pg_pool)
.define_job::<SendEmail>()
.init()
.await?;
Important core methods:
schema(name)sets the PostgreSQL schema for Graphile Worker tables. If it is not set, the default schema isgraphile_worker.concurrency(count)sets the maximum number of jobs processed at the same time. If it is not set, the worker uses the number of logical CPUs. Passing0panics.poll_interval(duration)controls fallback polling for new or future jobs. The default is one second.use_notification_delivery(value)controls whether the worker listens for PostgreSQLNOTIFYwakeups. The default istrue; set it tofalseto use polling only. Notifications are treated as coalesced wakeup hints; when all workers are already saturated, extra notifications are dropped and the runner keeps draining the listener instead of blocking on worker fanout.use_local_time(value)controls whether timestamps use application time (true) or PostgreSQL server time (false). The default is PostgreSQL server time.
PostgreSQL server time is the safer default when multiple worker processes run against the same database.
Job registration
Register every task handler that this worker should be able to run. The common
case is one or more define_job::<T>() calls:
let worker = WorkerOptions::default()
.define_job::<SendEmail>()
.define_job::<GenerateReport>()
.pg_pool(pg_pool)
.init()
.await?;
For larger applications, modules can expose reusable job definitions and the worker can register them together:
use graphile_worker::{JobDefinition, TaskHandler};
pub fn jobs() -> [JobDefinition; 2] {
[
SendEmail::definition(),
GenerateReport::definition(),
]
}
let worker = WorkerOptions::default()
.define_jobs(jobs())
.pg_pool(pg_pool)
.init()
.await?;
Important job methods:
define_job::<T>()registers aTaskHandler.define_batch_job::<T>()registers aBatchTaskHandler.define_jobs(iterable)registers reusableJobDefinitionvalues.add_forbidden_flag(flag)makes this worker skip jobs with that flag.
When add_forbidden_flag is used, local queue configuration is disabled during
initialization.
Cron schedules
Cron entries are added with with_cron or with_crons. Typed cron builders are
useful when you want Rust types to define the task and payload:
use graphile_worker::{Cron, CronJobKeyMode, CrontabFill, WorkerOptions};
let worker = WorkerOptions::default()
.define_job::<SayHello>()
.pg_pool(pg_pool)
.with_cron(
Cron::every_n_minutes::<SayHello>(2)?
.fill(CrontabFill::minutes(10))
.job_key("say_hello_dedupe")
.job_key_mode(CronJobKeyMode::PreserveRunAt)
.payload(SayHello {
message: "Crontab".to_string(),
})?,
)
.init()
.await?;
Crontab text is also accepted, but because it must be parsed, the call returns a
Result:
let options = WorkerOptions::default()
.define_job::<SendDigest>()
.with_cron("0 8 * * * send_digest")?;
Use with_crons([...]) when you already have multiple typed crontab values to
append.
Local queue
local_queue(config) enables batch-fetching jobs into an in-process local
queue. This can reduce PostgreSQL load for high-throughput workers by fetching
several jobs at once and distributing them locally to the worker's concurrency
slots.
use graphile_worker::{LocalQueueConfig, RefetchDelayConfig, WorkerOptions};
use std::time::Duration;
let worker = WorkerOptions::default()
.concurrency(4)
.schema("example_local_queue")
.define_job::<ProcessItem>()
.pg_pool(pg_pool)
.local_queue(
LocalQueueConfig::default()
.with_size(50)
.with_ttl(Duration::from_secs(60))
.with_refetch_delay(
RefetchDelayConfig::default()
.with_duration(Duration::from_millis(100))
.with_threshold(5)
.with_max_abort_threshold(20),
),
)
.init()
.await?;
Local queue settings are validated against the configured poll_interval during
init(). If forbidden flags are configured, the worker ignores the local queue
configuration.
Hooks, plugins, and extensions
Worker options can also compose application state and hook behavior:
let options = WorkerOptions::default()
.add_extension(app_config)
.add_plugin(LocalQueueMonitorPlugin::default())
.on(JobStart, |ctx| async move {
tracing::info!("job {} started", ctx.job.id());
});
Important composition methods:
add_extension(value)stores typed application state that handlers can read from the worker context.on(event, handler)registers a hook handler directly.add_plugin(plugin)registers a plugin that can attach several hooks.
Plugins are a good fit when a feature needs to register several related hooks, such as monitoring local queue events.
Completion and failure batching
For high-throughput workers, completion and permanent failure updates can be batched to reduce SQL round trips:
use std::time::Duration;
let worker = WorkerOptions::default()
.complete_job_batch_delay(Duration::from_millis(5))
.fail_job_batch_delay(Duration::from_millis(5))
.define_job::<ProcessItem>()
.pg_pool(pg_pool)
.init()
.await?;
complete_job_batch_delay(delay) batches successful completion updates.
fail_job_batch_delay(delay) batches permanent failure updates. Retryable
failures are still processed individually so retry backoff timing can be
preserved.
Composition examples
Basic worker
This is the smallest common shape: configure the database, register jobs, and run.
let worker = WorkerOptions::default()
.schema("example_simple_worker")
.concurrency(2)
.define_job::<SayHello>()
.pg_pool(pg_pool)
.init()
.await?;
worker.run().await?;
Scheduled worker
Scheduled workers are ordinary workers with cron entries added before initialization.
let worker = WorkerOptions::default()
.schema("example_simple_worker")
.concurrency(10)
.define_job::<SayHello>()
.define_job::<SayHello2>()
.pg_pool(pg_pool)
.with_cron(
Cron::every_n_minutes::<SayHello>(2)?
.fill(CrontabFill::minutes(10))
.payload(SayHello {
message: "Crontab".to_string(),
})?,
)
.init()
.await?;
Throughput-oriented worker
For job bursts, combine a tuned concurrency value with local queueing and small batching delays.
use std::time::Duration;
let worker = WorkerOptions::default()
.schema("jobs")
.concurrency(16)
.poll_interval(Duration::from_millis(500))
.complete_job_batch_delay(Duration::from_millis(5))
.fail_job_batch_delay(Duration::from_millis(5))
.local_queue(
LocalQueueConfig::default()
.with_size(100)
.with_ttl(Duration::from_secs(60)),
)
.define_jobs(jobs())
.pg_pool(pg_pool)
.init()
.await?;
Tune these values with your own workload, PostgreSQL latency, pool size, and job duration. Higher concurrency is useful only when the database pool and task workload can support it.
Runtime, TLS, and Drivers
Graphile Worker RS exposes three separate feature choices for database access:
- an async runtime feature
- a TLS backend feature
- a PostgreSQL driver feature
The default crate features enable the common path:
[dependencies]
graphile_worker = "0.13"
This is equivalent to enabling runtime-tokio, tls-rustls, and
driver-sqlx.
Feature Groups
Runtime features choose the async runtime used by the worker internals:
| Feature | Notes |
|---|---|
runtime-tokio | Default runtime. Required by driver-tokio-postgres. |
runtime-async-std | Supported with the SQLx driver. |
TLS features choose the TLS implementation passed through to the database crates:
| Feature | Notes |
|---|---|
tls-rustls | Default TLS backend. |
tls-native-tls | Native TLS backend. |
Driver features choose which PostgreSQL client integration is compiled:
| Feature | Notes |
|---|---|
driver-sqlx | Default driver. Enables the optional sqlx dependency. |
driver-tokio-postgres | Enables the tokio-postgres based driver path and also enables runtime-tokio. |
Valid Combinations
Use one runtime, one TLS backend when your driver needs TLS, and one driver. The combinations exercised by the repository test matrix are:
| Runtime | Driver | TLS in tested command | Status |
|---|---|---|---|
runtime-tokio | driver-sqlx | tls-rustls | Default tested path. |
runtime-async-std | driver-sqlx | tls-rustls | Tested SQLx async-std path. |
runtime-tokio | driver-tokio-postgres | Not added by the matrix command | Tested tokio-postgres path. |
driver-tokio-postgres is Tokio-only. The repository runtime test command
rejects driver-tokio-postgres with any runtime other than runtime-tokio.
Cargo Examples
Use defaults unless you have a reason to choose another runtime or driver:
[dependencies]
graphile_worker = "0.13"
To make the default feature set explicit:
[dependencies]
graphile_worker = { version = "0.13", default-features = false, features = [
"runtime-tokio",
"tls-rustls",
"driver-sqlx",
] }
To use SQLx with async-std:
[dependencies]
graphile_worker = { version = "0.13", default-features = false, features = [
"runtime-async-std",
"tls-rustls",
"driver-sqlx",
] }
To use the tokio-postgres driver:
[dependencies]
graphile_worker = { version = "0.13", default-features = false, features = [
"driver-tokio-postgres",
] }
driver-tokio-postgres enables runtime-tokio for Graphile Worker RS. Add a
TLS feature only when the rest of your database stack needs one from this crate.
SQLx Driver Executor Arguments
With driver-sqlx, the SQL helpers accept SQLx executors used by the tests:
- a pool
- an acquired connection
- a transaction
That means direct SQL helpers can participate in a caller-owned transaction. In the tested path, a job added inside a SQLx transaction disappears after the caller rolls the transaction back.
use graphile_worker::sql::add_job::single::add_job;
use graphile_worker::{JobSpec, Schema};
use serde_json::json;
let mut tx = pool.begin().await?;
add_job(
&mut tx,
&Schema::default(),
"send_email",
json!({ "user_id": 42 }),
JobSpec::default(),
false,
)
.await?;
tx.commit().await?;
The lower-level DbExecutorArg path is also tested for execute, fetch_all,
and fetch_one with SQLx pool, connection, and transaction executors.
tokio-postgres Driver Executor Arguments
With driver-tokio-postgres, the SQL helpers accept tokio-postgres and
deadpool-postgres executor arguments used by the tests:
tokio_postgres::Clienttokio_postgres::Transactiondeadpool_postgres::Pool- a
deadpool_postgresclient checked out from the pool
use graphile_worker::sql::add_job::single::add_job;
use graphile_worker::{JobSpec, Schema};
use serde_json::json;
use tokio_postgres::NoTls;
let (client, connection) =
tokio_postgres::connect("postgres://postgres:postgres@localhost/postgres", NoTls).await?;
tokio::spawn(async move {
let _ = connection.await;
});
add_job(
&client,
&Schema::default(),
"send_email",
json!({ "user_id": 42 }),
JobSpec::default(),
false,
)
.await?;
The tested tokio-postgres path also verifies that jobs added through a tokio-postgres transaction participate in the caller transaction and are not persisted after rollback.
Running the Matrix Locally
The project justfile defines targeted runtime and driver checks. If
DATABASE_URL is not set, the runtime target delegates to the Docker-backed
variant.
just test-runtime runtime-tokio driver-sqlx
just test-runtime runtime-async-std driver-sqlx
just test-runtime runtime-tokio driver-tokio-postgres
The combined Docker matrix runs the same supported combinations against a local PostgreSQL container:
just test-docker-all-matrices
Shutdown
Graphile Worker shuts down gracefully when its shutdown signal completes. By default this includes OS shutdown signals, but applications that already own process lifecycle management can provide their own future instead.
Shutdown behavior is configured with WorkerShutdownConfig or with the
matching convenience methods on WorkerOptions.
Defaults
WorkerShutdownConfig::default() uses these values:
use graphile_worker::WorkerShutdownConfig;
use std::time::Duration;
let shutdown = WorkerShutdownConfig::default()
.listen_os_shutdown_signals(true)
.grace_period(Duration::from_secs(5))
.interrupted_job_retry_delay(Duration::from_secs(30));
The default configuration:
- listens for OS shutdown signals
- gives in-flight jobs 5 seconds to finish after shutdown starts
- retries shutdown-aborted jobs after 30 seconds
- has no custom application shutdown signal
OS Signals
When listen_os_shutdown_signals is enabled, Graphile Worker installs the
platform shutdown listener provided by graphile-worker-shutdown-signal.
On Unix targets, the listener completes after one of these signals is received:
SIGUSR2SIGINTSIGPIPESIGTERMSIGHUP
On Windows targets, the listener handles these console control events:
CTRL_C_EVENTCTRL_CLOSE_EVENTCTRL_LOGOFF_EVENTCTRL_SHUTDOWN_EVENT
This is enabled by default:
use graphile_worker::WorkerOptions;
let worker = WorkerOptions::default()
.listen_os_shutdown_signals(true)
// ... other configuration
.init()
.await?;
Disable OS signal listeners when the host application already installs its own handlers:
use graphile_worker::WorkerOptions;
let worker = WorkerOptions::default()
.listen_os_shutdown_signals(false)
// ... other configuration
.init()
.await?;
Custom Shutdown Signal
A custom shutdown signal is any Future<Output = ()> + Send + 'static. The
future should complete when the host application requests shutdown.
use graphile_worker::{WorkerOptions, WorkerShutdownConfig};
let shutdown = WorkerShutdownConfig::default()
.listen_os_shutdown_signals(false)
.shutdown_signal(async {
wait_for_application_shutdown().await;
});
let worker = WorkerOptions::default()
.worker_shutdown(shutdown)
// ... other configuration
.init()
.await?;
worker.run().await?;
You can also set the signal directly on WorkerOptions:
use graphile_worker::WorkerOptions;
let worker = WorkerOptions::default()
.listen_os_shutdown_signals(false)
.shutdown_signal(async {
wait_for_application_shutdown().await;
})
// ... other configuration
.init()
.await?;
If both OS signal listening and a custom signal are configured, shutdown starts when either signal completes.
Grace Period
grace_period controls how long in-flight jobs may continue after a shutdown
signal is received.
use graphile_worker::WorkerShutdownConfig;
use std::time::Duration;
let shutdown = WorkerShutdownConfig::default()
.grace_period(Duration::from_secs(20));
The equivalent WorkerOptions convenience method is:
use graphile_worker::WorkerOptions;
use std::time::Duration;
let worker = WorkerOptions::default()
.shutdown_grace_period(Duration::from_secs(20))
// ... other configuration
.init()
.await?;
During shutdown, Graphile Worker still owns graceful draining after the signal is received.
Interrupted Job Retry Delay
interrupted_job_retry_delay controls when a job aborted by shutdown becomes
eligible to run again.
use graphile_worker::WorkerShutdownConfig;
use std::time::Duration;
let shutdown = WorkerShutdownConfig::default()
.interrupted_job_retry_delay(Duration::from_secs(60));
The equivalent WorkerOptions convenience method is:
use graphile_worker::WorkerOptions;
use std::time::Duration;
let worker = WorkerOptions::default()
.shutdown_interrupted_job_retry_delay(Duration::from_secs(60))
// ... other configuration
.init()
.await?;
Shutdown-aborted jobs use this delay instead of normal failure backoff, so shutdown does not consume a retry attempt. This behavior applies to ordinary graceful shutdown even when worker recovery is disabled.
Application State and Extensions
Task handlers often need access to dependencies that are not part of the job payload: configuration, API clients, counters, caches, or other process-local state. Graphile Worker RS exposes this through worker extensions.
An extension is a value registered on WorkerOptions and later read from
WorkerContext by type. Each handler receives a WorkerContext, so the same
shared state is available wherever jobs run in that worker process.
Register Shared State
Register application state with WorkerOptions::add_extension before calling
init().
use graphile_worker::WorkerOptions;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
#[derive(Clone, Debug)]
struct AppState {
run_count: Arc<AtomicUsize>,
}
impl AppState {
fn new() -> Self {
Self {
run_count: Arc::new(AtomicUsize::new(0)),
}
}
fn increment_run_count(&self) -> usize {
self.run_count.fetch_add(1, SeqCst)
}
}
let worker = WorkerOptions::default()
.define_job::<ShowRunCount>()
.pg_pool(pg_pool)
.add_extension(AppState::new())
.init()
.await?;
Extension values are stored by Rust type. add_extension accepts values that
are Clone + Send + Sync + Debug + 'static. If another value with the same type
is inserted into the same extension set, the later value replaces the earlier
one.
Use a wrapper type when you need to store two values with the same underlying type:
#[derive(Clone, Debug)]
struct PublicApiBaseUrl(String);
#[derive(Clone, Debug)]
struct InternalApiBaseUrl(String);
Read State From a Handler
Inside a task handler, call ctx.get_ext::<T>() to retrieve an extension by
type. The method returns Option<&T>, so handlers can decide whether missing
state is a job error or an application configuration error.
use graphile_worker::{IntoTaskHandlerResult, WorkerContext};
use graphile_worker_task_handler::TaskHandler;
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct ShowRunCount;
impl TaskHandler for ShowRunCount {
const IDENTIFIER: &'static str = "show_run_count";
async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
let app_state = ctx
.get_ext::<AppState>()
.ok_or_else(|| "AppState extension is not configured".to_string())?;
let run_count = app_state.increment_run_count();
println!("Run count: {run_count}");
Ok::<(), String>(())
}
}
Handlers receive read-only access to the extension container. If the state
itself must be mutated or shared across concurrent jobs, put the synchronization
inside your state type, for example with Arc, atomics, or other thread-safe
interior mutability.
What WorkerContext Provides
WorkerContext is the per-job context passed to every task handler. In addition
to extensions, it exposes job and worker data such as:
ctx.payload()for the JSON payload.ctx.job()for the complete job record.ctx.database()for the database handle.ctx.pg_pool()when the SQLx driver feature is used.ctx.schema()for the configured Graphile Worker schema.ctx.worker_id()for the worker processing the job.ctx.get_ext::<T>()for application-specific extensions.
Use extensions for dependencies owned by your application. Use the built-in context methods for worker metadata and database access.
Context Helper Pattern
Some handlers need to enqueue follow-up work. Import WorkerContextExt to use
the helper methods implemented for WorkerContext.
use chrono::{offset::Utc, Duration};
use graphile_worker::{
IntoTaskHandlerResult, JobSpecBuilder, WorkerContext, WorkerContextExt,
};
use graphile_worker_task_handler::TaskHandler;
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize, Clone)]
struct SendWs {
request_id: String,
}
impl TaskHandler for SendWs {
const IDENTIFIER: &'static str = "send_ws";
async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("[send_ws] sent request {}", self.request_id);
ctx.add_job(
CheckWs {
request_id: self.request_id,
},
JobSpecBuilder::new()
.run_at(Utc::now() + Duration::seconds(10))
.build(),
)
.await
.map_err(|e| e.to_string())?;
Ok::<(), String>(())
}
}
#[derive(Deserialize, Serialize, Clone)]
struct CheckWs {
request_id: String,
}
The helper trait creates a WorkerUtils value from the current context, using
the same database, schema, task details, and local-time setting as the running
worker. It provides typed and raw job enqueueing helpers:
ctx.utils()ctx.add_job(...)ctx.add_raw_job(...)ctx.add_jobs(...)ctx.add_raw_jobs(...)ctx.add_batch_job(...)
This pattern keeps handler code small: use extensions for application-owned
dependencies, and use WorkerContextExt when the handler needs to schedule more
work through the same worker configuration.
Guides
Use these guides when you already know the basic worker shape and want to wire Graphile Worker RS into a real application flow. If you are new to the crate, start with Quick Start or First Worker, then come back here for focused workflows.
Pick a Guide
| Goal | Start here |
|---|---|
| Add work to the queue from application code | Scheduling Jobs |
| Group related work and control batch behavior | Batch Jobs |
| Run recurring work on a cron schedule | Cron Jobs |
| Understand low-latency in-process job dispatch | Local Queue |
| Recover jobs after crashed or stale workers | Worker Recovery |
| Observe or customize job lifecycle events | Lifecycle Hooks |
| Inspect, retry, or manage jobs from code | Job Management |
Common Starting Points
Define a Task Handler
Most guides assume you have a task type that implements TaskHandler. A task
has a stable identifier, a serializable payload, and an async run method:
use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SendEmail {
to: String,
subject: String,
body: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Sending email to {}", self.to);
Ok::<(), String>(())
}
}
For the full setup flow, see First Worker.
Register Jobs on a Worker
Register task handlers with WorkerOptions, then provide a PostgreSQL pool and
initialize the worker:
let worker = graphile_worker::WorkerOptions::default()
.concurrency(5)
.schema("graphile_worker")
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
For configuration details, see Worker Options, Runtime, TLS, and Drivers, and Application State and Extensions.
Choose the Operational Path
After the worker is running, the guide you need depends on how jobs should be created and supervised:
- Use Scheduling Jobs for ordinary background work such as sending emails, generating PDFs, or deferring slow application tasks.
- Use Cron Jobs when the work should be created from a recurring schedule instead of a user request.
- Use Worker Recovery for deployments where a process crash, network partition, forced abort, or orchestrator shutdown could leave jobs locked.
- Use Lifecycle Hooks when you need code to run around job events.
- Use Job Management when your application needs utility operations for existing jobs.
Related References
- Tasks and Payloads explains the task model used by these guides.
- Queues and Concurrency covers how workers limit and coordinate job execution.
- Shutdown describes graceful shutdown behavior.
- Migrations covers preparing the PostgreSQL schema used by the worker.
- Troubleshooting collects common diagnosis paths when a guide does not match what you see at runtime.
Scheduling Jobs
Jobs can be scheduled either from SQL or from Rust. Both paths write to the same Graphile Worker tables and use the same job options: task identifier, JSON payload, queue name, scheduled time, retry limit, job key, priority, and flags.
Use SQL when you are already inside a database migration, trigger, or stored
procedure. Use WorkerUtils when scheduling from Rust application code.
Scheduling From SQL
The worker schema exposes an add_job function. The default schema is
graphile_worker, so a minimal insert looks like this:
select * from graphile_worker.add_job(
identifier => 'send_email',
payload => '{"to":"user@example.com"}'::json
);
The Rust scheduler calls the same function with these named arguments:
select * from graphile_worker.add_job(
identifier => 'send_email',
payload => '{"to":"user@example.com"}'::json,
queue_name => 'mailers',
run_at => now() + interval '5 minutes',
max_attempts => 10,
job_key => 'send_email:user@example.com',
priority => 5,
flags => array['transactional'],
job_key_mode => 'replace'
);
identifier must match a registered task identifier in your worker process.
payload is passed to the task handler as JSON.
Scheduling From Rust
Create WorkerUtils from a running worker when you want it to share the
worker's database, schema, hooks, task details, and time configuration:
let utils = worker.create_utils();
For a typed task, pass the task payload and a JobSpec. The task identifier
comes from TaskHandler::IDENTIFIER, and the payload is serialized with
serde_json.
use graphile_worker::{JobSpec, TaskHandler};
let job = utils
.add_job(
SendEmail {
to: "user@example.com".to_string(),
subject: "Welcome".to_string(),
},
JobSpec::default(),
)
.await?;
For dynamic scheduling, use a raw identifier and any serializable payload:
use graphile_worker::JobSpec;
use serde_json::json;
let job = utils
.add_raw_job(
"send_email",
json!({
"to": "user@example.com",
"subject": "Welcome"
}),
JobSpec::default(),
)
.await?;
Raw jobs are useful when the caller does not know the Rust task type at compile
time. Typed jobs are preferred when the task type is available because the
identifier and payload type stay tied to the TaskHandler implementation.
Job Options
JobSpec controls how the job is inserted:
use chrono::{Duration, Utc};
use graphile_worker::{JobKeyMode, JobSpecBuilder};
let spec = JobSpecBuilder::new()
.queue_name("mailers")
.run_at(Utc::now() + Duration::minutes(5))
.max_attempts(10)
.job_key("send_email:user@example.com")
.job_key_mode(JobKeyMode::Replace)
.priority(5)
.flags(vec!["transactional".to_string()])
.build();
All JobSpec fields are optional. JobSpec::default() or
JobSpec::new() schedules with no explicit options.
| Field | Effect |
|---|---|
queue_name | Assigns the job to a named queue. |
run_at | Delays execution until the given UTC timestamp. |
max_attempts | Sets the maximum number of attempts before the job is permanently failed. |
job_key | Deduplicates scheduled jobs with the same key. |
job_key_mode | Controls how an existing keyed job is handled. |
priority | Sets job priority. Lower numbers run sooner. |
flags | Stores string flags on the job for worker features and custom conventions. |
When run_at is not supplied, the database function schedules the job using
its default time behavior. If the worker is configured to use local application
time, WorkerUtils supplies Utc::now() for jobs without an explicit run_at.
Job Keys
Set job_key when multiple scheduling attempts should refer to the same
logical job.
use graphile_worker::{JobKeyMode, JobSpecBuilder};
let spec = JobSpecBuilder::new()
.job_key("recalculate-account:42")
.job_key_mode(JobKeyMode::Replace)
.build();
utils.add_job(RecalculateAccount { account_id: 42 }, spec).await?;
The supported modes are:
| Mode | Behavior visible from the scheduler |
|---|---|
JobKeyMode::Replace | Replaces the existing keyed job data. This is the default mode when a key is used. |
JobKeyMode::PreserveRunAt | Updates the keyed job but keeps its existing run_at. |
JobKeyMode::UnsafeDedupe | Deduplicates without replacing the existing run_at; supported for single-job scheduling only. |
When a keyed job is updated, the stored job is reused and its revision is incremented. If a keyed job is locked while a replacement is scheduled, the implementation retires the locked row for normal failure handling and keeps a single replacement row with the key.
Bulk Scheduling
Use add_jobs to schedule many jobs of the same task type in one database
round trip:
use graphile_worker::JobSpecBuilder;
let urgent = JobSpecBuilder::new().priority(-10).build();
let normal = JobSpec::default();
let jobs = vec![
(SendEmail { to: "a@example.com".to_string() }, &urgent),
(SendEmail { to: "b@example.com".to_string() }, &normal),
];
let added = utils.add_jobs::<SendEmail>(&jobs).await?;
Each item can reference its own JobSpec, or many items can share the same
spec.
Use add_raw_jobs when a single bulk insert needs heterogeneous task
identifiers:
use graphile_worker::{JobSpec, RawJobSpec};
use serde_json::json;
let jobs = vec![
RawJobSpec {
identifier: "send_email".into(),
payload: json!({ "to": "a@example.com" }),
spec: JobSpec::default(),
},
RawJobSpec {
identifier: "refresh_search_index".into(),
payload: json!({ "model": "account", "id": 42 }),
spec: JobSpec::default(),
},
];
let added = utils.add_raw_jobs(&jobs).await?;
Empty bulk inputs return an empty Vec and do not insert jobs.
Bulk scheduling has two job-key limitations:
JobKeyMode::UnsafeDedupeis rejected foradd_jobsandadd_raw_jobs.- If any job in the batch uses
JobKeyMode::PreserveRunAt, the batch call applies the preserve-run-at setting to the underlying database operation. Use individualadd_joboradd_raw_jobcalls when each job needs independent key-mode behavior.
Attempts, Priority, And Flags
max_attempts controls the retry limit for the scheduled job. Failed jobs are
retried by the worker until their attempt count reaches this limit.
priority is stored as an integer. Jobs are fetched in ascending priority
order, so lower values, including negative values, run before the default
priority 0.
flags are stored as an array of strings. The scheduler does not interpret
custom flag names while adding jobs; it passes them through to the database.
For changing jobs after they have been scheduled, see Job Management.
Batch Jobs
Graphile Worker RS has two batch-related APIs:
add_jobsandadd_raw_jobsinsert many ordinary jobs in one call.BatchTaskHandlerprocesses a JSON array stored in one job payload.
Use bulk adds when you want to enqueue many independent jobs efficiently. Use a batch handler when one task run should receive a group of items and decide which items completed or failed.
Bulk Add Ordinary Jobs
WorkerUtils::add_jobs inserts multiple jobs for the same typed task. Each item
is still stored as its own job row and is processed by the normal TaskHandler
implementation.
use graphile_worker::{JobSpec, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};
#[derive(Clone, Deserialize, Serialize)]
struct SendEmail {
to: String,
subject: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> Result<(), String> {
send_email(self.to, self.subject).await
}
}
let spec = JobSpec::default();
let jobs = [
(
SendEmail {
to: "alice@example.com".into(),
subject: "Welcome!".into(),
},
&spec,
),
(
SendEmail {
to: "bob@example.com".into(),
subject: "Welcome!".into(),
},
&spec,
),
];
let added_jobs = worker.create_utils().add_jobs::<SendEmail>(&jobs).await?;
The JobSpec is supplied per job. You can reuse one spec for all jobs or pass
different specs to set different priorities, queues, run times, or other job
options.
use graphile_worker::{JobSpec, JobSpecBuilder};
let urgent = JobSpecBuilder::new().priority(-10).build();
let normal = JobSpec::default();
worker
.create_utils()
.add_jobs::<SendEmail>(&[
(urgent_email, &urgent),
(normal_email, &normal),
])
.await?;
For heterogeneous batches, use add_raw_jobs. Each RawJobSpec carries its own
task identifier, JSON payload, and JobSpec.
use graphile_worker::{JobSpec, JobSpecBuilder, RawJobSpec};
use serde_json::json;
let added_jobs = worker
.create_utils()
.add_raw_jobs(&[
RawJobSpec {
identifier: "send_email".into(),
payload: json!({
"to": "dave@example.com",
"subject": "Notification"
}),
spec: JobSpec::default(),
},
RawJobSpec {
identifier: "process_payment".into(),
payload: json!({ "user_id": 123, "amount": 50 }),
spec: JobSpecBuilder::new().priority(-10).build(),
},
])
.await?;
Bulk-added ordinary jobs are fetched and run like any other job. They do not
call run_batch.
Batch Handlers
A batch job is one job row whose payload is a JSON array. Register it with
define_batch_job and implement BatchTaskHandler for the item type stored in
that array.
use graphile_worker::{
BatchTaskHandler, IntoBatchTaskHandlerResult, JobSpec, WorkerContext,
WorkerOptions,
};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct PendingNotification {
user_id: String,
message_id: String,
}
impl BatchTaskHandler for PendingNotification {
const IDENTIFIER: &'static str = "send_notifications";
async fn run_batch(
items: Vec<Self>,
_ctx: WorkerContext,
) -> impl IntoBatchTaskHandlerResult {
let mut results = Vec::with_capacity(items.len());
for item in items {
results.push(
send_notification(item)
.await
.map_err(|error| error.to_string()),
);
}
results
}
}
let worker = WorkerOptions::default()
.define_batch_job::<PendingNotification>()
// ... database pool and other options
.init()
.await?;
worker
.create_utils()
.add_batch_job(
vec![
PendingNotification {
user_id: "1".into(),
message_id: "a".into(),
},
PendingNotification {
user_id: "1".into(),
message_id: "b".into(),
},
],
JobSpec::default(),
)
.await?;
The stored payload for that job is an array:
[
{ "user_id": "1", "message_id": "a" },
{ "user_id": "1", "message_id": "b" }
]
add_batch_job requires at least one item. If the final payload is not a JSON
array, or is an empty array, the job is rejected before insertion.
Return Values and Retries
Batch handlers can report success for the whole batch or per item.
Returning Ok(()), (), or BatchTaskResult::Complete completes the whole job.
impl BatchTaskHandler for PendingNotification {
const IDENTIFIER: &'static str = "send_notifications";
async fn run_batch(
_items: Vec<Self>,
_ctx: WorkerContext,
) -> impl IntoBatchTaskHandlerResult {
Ok::<(), String>(())
}
}
Returning one result per payload item lets Graphile Worker RS retry only the
failed items. The result vector must have the same length as the input items.
impl BatchTaskHandler for PendingNotification {
const IDENTIFIER: &'static str = "send_notifications";
async fn run_batch(
items: Vec<Self>,
_ctx: WorkerContext,
) -> impl IntoBatchTaskHandlerResult {
items
.into_iter()
.map(|item| {
if should_retry(&item) {
Err(format!("failed {}", item.message_id))
} else {
Ok(())
}
})
.collect::<Vec<_>>()
}
}
If only some item results fail, the worker removes successful items from the payload and leaves a retry job containing only the failed items. If the whole handler fails, or if the result vector length does not match the number of input items, the original payload is retried.
Invalid stored payloads are treated as job failures:
- A batch handler requires the job payload to be a JSON array.
- Every array item must deserialize into the batch item type.
- Deserialization errors leave the original payload on the retried job.
Adding Batch Jobs From a Handler
WorkerContext can enqueue a typed batch job from inside another task.
impl TaskHandler for ParentJob {
const IDENTIFIER: &'static str = "batch_parent_job";
async fn run(self, ctx: WorkerContext) -> Result<(), String> {
ctx.add_batch_job(
vec![
PendingNotification {
user_id: "1".into(),
message_id: "a".into(),
},
PendingNotification {
user_id: "1".into(),
message_id: "b".into(),
},
],
JobSpec::default(),
)
.await
.map(|_| ())
.map_err(|error| error.to_string())
}
}
The inserted job uses the batch task identifier and stores the provided items as one JSON array payload.
Internal Completion and Failure Batching
The worker also batches some internal persistence work when completing or failing jobs. Completion and failure requests are collected for a configured delay, then flushed together. On shutdown, the batcher drains queued requests and flushes them before exiting. If the batcher has already closed, the worker falls back to direct completion or failure persistence for that job.
This internal persistence batching is separate from BatchTaskHandler: it does
not change how many payload items your handler receives.
Cron Jobs
Cron jobs schedule registered tasks from cron-like definitions. They are useful for recurring work such as cleanup, reporting, reconciliation, and periodic synchronization.
Cron entries can be added with typed Rust builders or with crontab text:
WorkerOptions::with_cron(...)accepts one cron entry. It accepts typed builders, rawCrontabvalues, and crontab text.WorkerOptions::with_crons(...)accepts multiple typed or rawCrontabvalues.WorkerOptions::with_crontab(...)accepts crontab text, but is deprecated in favor ofwith_cron(...).
Typed cron builders
Use Cron when the schedule belongs to a Rust TaskHandler. The task
identifier comes from T::IDENTIFIER, so it stays aligned with the job you
registered with define_job.
use graphile_worker::{
Cron, CrontabFill, IntoTaskHandlerResult, TaskHandler, WorkerContext,
WorkerOptions,
};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SendDigest {
account_id: i64,
}
impl TaskHandler for SendDigest {
const IDENTIFIER: &'static str = "send_digest";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
// Send the digest.
}
}
let worker = WorkerOptions::default()
.define_job::<SendDigest>()
.with_cron(
Cron::daily_at::<SendDigest>(8, 0)?
.id("send_digest_morning")
.fill(CrontabFill::hours(1))
.payload(SendDigest { account_id: 42 })?,
)
.pg_pool(pg_pool)
.init()
.await?;
Typed schedules are built with:
Cron::every_minute::<MyTask>()
Cron::every_n_minutes::<MyTask>(5)?
Cron::hourly_at::<MyTask>(15)?
Cron::daily_at::<MyTask>(8, 0)?
Cron::weekly_on::<MyTask>(chrono::Weekday::Mon, 8, 0)?
Cron::monthly_on::<MyTask>(1, 8, 0)?
Cron::yearly_on::<MyTask>(1, 1, 8, 0)?
The typed constructors validate field ranges. Minutes are 0..=59, hours are
0..=23, days of month are 1..=31, and months are 1..=12.
Builder options include:
Cron::every_n_minutes::<SendDigest>(10)?
.id("send_digest_every_10_minutes")
.fill(CrontabFill::minutes(30))
.max_attempts(5)
.queue("digest")
.priority(-10)
.job_key("send_digest_dedupe")
.job_key_mode(graphile_worker::CronJobKeyMode::PreserveRunAt)
.payload(SendDigest { account_id: 42 })?;
Use payload(...) for a typed payload that serializes as JSON. Use
payload_value(...) when you already have a serde_json::Value.
Multiple cron entries
Use with_crons for a collection of typed builders or Crontab values:
let worker = WorkerOptions::default()
.define_job::<SendDigest>()
.define_job::<SyncMetrics>()
.with_crons([
Cron::daily_at::<SendDigest>(8, 0)?.build(),
Cron::every_n_minutes::<SyncMetrics>(15)?.build(),
])
.pg_pool(pg_pool)
.init()
.await?;
Crontab text
with_cron also accepts crontab text and returns a Result because the string
is parsed at runtime.
let worker = WorkerOptions::default()
.define_job::<SendDigest>()
.with_cron(
r#"
Run at 08:00 UTC every day.
0 8 * * * send_digest ?id=send_digest_morning&fill=1h {account_id:42}
"#,
)?
.pg_pool(pg_pool)
.init()
.await?;
The text format is:
* * * * * task_identifier ?options {payload}
The five schedule fields are UTC minute, UTC hour, UTC day of month, UTC month,
and UTC day of week. Days of week use 0..=6.
Supported schedule values are:
*for every valid value.*/nfor every value divisible byn.nfor one explicit value.a-bfor an inclusive range.- Comma-separated combinations such as
0,15,30,45or4,10-15.
Examples:
# Every minute.
* * * * * tick
# Every 4 hours at minute 0.
0 */4 * * * rollup
# Mondays at 04:30 UTC.
30 4 * * 1 send_weekly_email
# More than one value in a field.
30 4,10-15 * * 1 send_weekly_email
Task identifiers must start with a letter or underscore. After that they may contain letters, numbers, colon, underscore, or hyphen.
Options
Crontab text options use query-string syntax and must start with ?.
0 8 * * * send_digest ?id=send_digest_morning&fill=1h&max=5&queue=digest&priority=10
Supported options are:
id: stable identifier for this cron entry. By default the task identifier is used. Set this when the same task has more than one schedule.fill: backfill duration for missed executions.max: overrides the job's maximum attempts.queue: schedules the job in a named queue.priority: overrides the job priority.job_key: sets a job key for deduplication.job_key_mode: controls an existing unlocked job with the same key. Supported serialized values arereplaceandpreserve_run_at.
Changing a cron entry identifier can cause the worker to treat it as a new cron
entry. Prefer setting a stable id when a task has multiple cron schedules.
Payloads
Cron jobs receive a JSON object payload. The worker adds a _cron object to the
payload for each scheduled job:
{
"_cron": {
"ts": "2026-06-16T08:00:00",
"backfilled": false
}
}
ts is the timestamp when the cron execution was due, and backfilled is
true when the job was scheduled as a backfill.
With typed builders, set the payload with payload(...) or
payload_value(...). With crontab text, write a JSON5 object after the options:
0 8 * * * send_digest ?id=send_digest_morning {account_id:42, urgent:false}
The parsed payload is merged with the default cron payload properties. The text
payload must start with { and stay on one line.
Backfill and fill
By default, cron entries do not backfill missed executions. Add fill to ask
the worker to schedule missed runs from a recent time window when it starts or
recovers a known cron entry.
Typed builder:
Cron::every_n_minutes::<SyncMetrics>(10)?
.id("sync_metrics")
.fill(CrontabFill::minutes(30))
Crontab text:
*/10 * * * * sync_metrics ?id=sync_metrics&fill=30m
Fill durations are made from time units:
s: secondsm: minutesh: hoursd: daysw: weeks
Units may be combined in order, for example 1w2d3h30m. In Rust, use
CrontabFill::seconds, minutes, hours, days, weeks, or
CrontabFill::new(w, d, h, m, s).
Backfill only applies to cron entries that were already known to the worker.
When a cron entry is first registered, the worker records it as known but does
not immediately create old jobs for it. Tests cover this behavior by asserting
that a newly registered cron entry has no last_execution and creates no
backfill jobs.
When a known cron entry has a previous execution timestamp, the worker can catch
up missed matching schedule times inside the configured fill window. A four-hour
cron with fill=1d, for example, can create the missed four-hour executions
from the last day. Larger fill windows can schedule more jobs and take longer at
startup.
Runtime behavior
The cron runner checks schedule times and inserts regular jobs for matching entries. If time advances by multiple matching intervals, the runner catches up by scheduling the missed ticks it observes. Jobs are then processed by the same registered task handlers as jobs added through the regular job APIs.
Local Queue
Local Queue is an optional worker-side cache for high-throughput workloads. When enabled, the worker fetches a batch of jobs from PostgreSQL, keeps them locked locally, and lets worker concurrency drain that local batch without another database round trip for every job.
Use it when many small jobs make database fetch overhead noticeable. For slow or rare jobs, the default direct database fetch path is usually simpler and keeps less work locked inside a single process.
Basic Configuration
Enable the feature with WorkerOptions::local_queue:
use graphile_worker::{LocalQueueConfig, RefetchDelayConfig, WorkerOptions};
use std::time::Duration;
let worker = WorkerOptions::default()
.concurrency(4)
.local_queue(
LocalQueueConfig::default()
.with_size(100)
.with_ttl(Duration::from_secs(300))
.with_refetch_delay(
RefetchDelayConfig::default()
.with_duration(Duration::from_millis(100))
.with_threshold(10)
.with_max_abort_threshold(500),
),
)
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
The same settings are available through the generated builders:
use graphile_worker::{LocalQueueConfig, RefetchDelayConfig};
use std::time::Duration;
let config = LocalQueueConfig::builder()
.size(100)
.ttl(Duration::from_secs(300))
.refetch_delay(
RefetchDelayConfig::builder()
.duration(Duration::from_millis(100))
.threshold(10)
.max_abort_threshold(500)
.build(),
)
.build();
Config Fields
size controls the maximum number of jobs fetched and held by each local queue.
The default is 100. It must be greater than zero and must not exceed
i32::MAX.
ttl controls how long fetched jobs may remain unclaimed in the local queue
before they are returned to PostgreSQL. The default is five minutes.
refetch_delay is optional. It slows the next fetch after a low-yield fetch so
the worker does not immediately poll the database again when the queue appears
empty or nearly empty.
queue_count controls how many independent local queues run inside one worker.
The default is 1. It must be greater than zero. size applies per queue, so
the maximum local capacity is size * queue_count.
If queue_count is greater than worker concurrency, it is capped at the
concurrency value because each local queue needs a worker draining it.
Modes
Each local queue moves through a small state machine:
Polling: the queue may fetch jobs from PostgreSQL.Waiting: jobs are cached locally and are being served from memory.TtlExpired: the local cache TTL expired and unclaimed jobs are being returned to PostgreSQL.Released: the local queue has shut down and will not provide more jobs.
When the cached batch is drained, the queue returns to Polling. When TTL
expires, unclaimed jobs are returned to PostgreSQL and later fetches can poll
again.
Refetch Delay
Refetch delay starts when a fetch returns fewer jobs than requested and the number of jobs fetched is not greater than the configured threshold.
use graphile_worker::{LocalQueueConfig, RefetchDelayConfig};
use std::time::Duration;
let config = LocalQueueConfig::default().with_refetch_delay(
RefetchDelayConfig::default()
.with_duration(Duration::from_millis(200))
.with_threshold(5),
);
The actual delay is jittered between half of duration and the full duration.
duration must not be larger than the worker poll_interval.
PostgreSQL notifications can wake the local queue sooner. While a refetch delay
is active, incoming job pulses increment an internal counter. When that counter
reaches the abort threshold, the delay is aborted and the queue fetches again.
If max_abort_threshold is not set, the abort threshold is randomized up to
5 * size. If it is set, the randomized abort threshold is capped by that
value.
Distribution
Local Queue does not change job ownership rules. Jobs are still locked in PostgreSQL under the worker id, and the worker's normal concurrency controls how many handlers can run at the same time.
With a single local queue, one batch is shared by the worker tasks. With
queue_count greater than one, the worker starts multiple independent local
queues and checks them round-robin. This can improve throughput for very small,
high-volume jobs, but it can also lock more work inside one process:
use graphile_worker::LocalQueueConfig;
let config = LocalQueueConfig::default()
.with_size(50)
.with_queue_count(4);
In this example, up to 200 jobs may be locked locally if worker concurrency is
at least 4.
Shutdown And TTL
Local Queue returns unclaimed cached jobs to PostgreSQL in two cases:
- the queue TTL expires while jobs are still cached;
- the worker shuts down and releases the local queue.
Returning jobs is retried internally. Jobs that are already running are handled by the worker's normal shutdown behavior; the local queue release path is for jobs that were fetched into the local cache but not yet claimed by a handler.
Choose ttl based on how much work you are comfortable locking inside one
process if handlers are slower than the local batch drain rate.
Forbidden Flags
Workers configured with forbidden flags do not use the local queue cache. During worker initialization, a non-empty forbidden flag list disables the local queue configuration for that worker.
This preserves flag filtering semantics: jobs with forbidden flags are skipped by the worker, and eligible jobs are fetched directly from PostgreSQL with the forbidden flag filter applied.
Worker Recovery
Worker recovery is an opt-in safety mechanism for jobs that remain locked after a worker stops unexpectedly. It is useful when a process crashes, is aborted, loses database connectivity, or is terminated by an orchestrator before it can release its in-flight jobs.
When recovery is enabled, workers write heartbeat rows to PostgreSQL. A sweeper then looks for workers whose heartbeat is stale and returns their locked jobs to the queue.
Enabling Recovery
Recovery is disabled by default. You can enable it with WorkerRecoveryConfig:
use graphile_worker::{WorkerOptions, WorkerRecoveryConfig};
use std::time::Duration;
let recovery = WorkerRecoveryConfig::default()
.enabled(true)
.heartbeat_interval(Duration::from_secs(30))
.sweep_interval(Duration::from_secs(60))
.sweep_threshold(Duration::from_secs(300))
.recovery_delay(Duration::from_secs(30));
let worker = WorkerOptions::default()
.worker_recovery(recovery)
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
The convenience setters on WorkerOptions also enable recovery:
use graphile_worker::WorkerOptions;
use std::time::Duration;
let worker = WorkerOptions::default()
.heartbeat_interval(Duration::from_secs(30))
.sweep_interval(Duration::from_secs(60))
.sweep_threshold(Duration::from_secs(300))
.recovery_delay(Duration::from_secs(30))
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
Configuration
WorkerRecoveryConfig::default() uses these values:
| Option | Default | Meaning |
|---|---|---|
enabled | false | Whether heartbeat registration and sweeping are active. |
heartbeat_interval | 30 seconds | How often an enabled worker updates its heartbeat row. |
sweep_interval | 60 seconds | How often the background sweeper checks for inactive workers. |
sweep_threshold | 5 minutes | How old a heartbeat must be before the worker is considered inactive. |
recovery_delay | 30 seconds | Delay before recovered jobs are eligible to run again. |
resilient_sweep_threshold_multiplier | 3 | Multiplier applied to sweep_threshold for workers holding resilient jobs. |
resilient_job_flags | ["infrastructure_resilient"] | Job flags that activate the extended threshold. |
Each builder method on WorkerRecoveryConfig stores the new value and enables
recovery, except .enabled(false), which explicitly disables it again.
Heartbeats
An enabled worker registers itself in the private workers table and refreshes
last_heartbeat_at at the configured heartbeat_interval.
You can inspect registered workers with WorkerUtils::list_active_workers.
The stale state is calculated from the threshold you pass to that method:
use std::time::Duration;
let workers = utils
.list_active_workers(Duration::from_secs(60))
.await?;
for worker in workers {
println!(
"worker={} stale={} started_at={} last_heartbeat_at={}",
worker.worker_id,
worker.is_stale,
worker.started_at,
worker.last_heartbeat_at
);
}
Sweeping Stale Workers
The background sweeper runs at sweep_interval. During a sweep it:
- takes a transaction-scoped PostgreSQL advisory lock, so concurrent sweepers do not recover the same jobs twice;
- finds workers whose heartbeat is older than
sweep_threshold; - also finds orphan job locks whose
locked_byworker is no longer registered; - unlocks the jobs held by those workers;
- decrements the recovered jobs' attempt count back down;
- releases queue locks for queued jobs;
- moves
run_atforward byrecovery_delay; - records
Job recovered after worker interruptionas the recovered job error.
If another worker already holds the sweep lock, the sweep exits without recovering jobs.
Manual Sweeps
Use WorkerUtils::sweep_stale_workers when you want an operator command, admin
endpoint, or scheduled task to run recovery directly.
use graphile_worker::{SweepStaleWorkersOptions, WorkerUtils};
use std::time::Duration;
let utils = WorkerUtils::new(database, "graphile_worker".to_string());
let result = utils
.sweep_stale_workers(SweepStaleWorkersOptions {
sweep_threshold: Some(Duration::from_secs(60)),
recovery_delay: Some(Duration::from_secs(30)),
dry_run: false,
})
.await?;
println!(
"recovered {} jobs from {:?}",
result.recovered_count,
result.worker_ids
);
SweepStaleWorkersOptions can override sweep_threshold and recovery_delay
for a single sweep. Leave either field as None to use the recovery config
defaults. Set dry_run: true to return the worker IDs that would be considered
dead without recovering jobs or deleting stale worker rows.
When you need custom resilient settings for a manual sweep, pass a
WorkerRecoveryConfig explicitly:
use graphile_worker::{SweepStaleWorkersOptions, WorkerRecoveryConfig};
use std::time::Duration;
let config = WorkerRecoveryConfig::default()
.sweep_threshold(Duration::from_secs(60))
.resilient_sweep_threshold_multiplier(3);
let result = utils
.sweep_stale_workers_with_config(&config, SweepStaleWorkersOptions {
recovery_delay: Some(Duration::from_secs(30)),
dry_run: false,
..Default::default()
})
.await?;
Resilient Jobs
Some jobs are expected to run for a long time. Recovery supports a resilient flag so those jobs are not reclaimed as quickly as ordinary work.
By default, a job is resilient when its flags include
"infrastructure_resilient": true. Workers holding resilient jobs use:
effective threshold = sweep_threshold * resilient_sweep_threshold_multiplier
The default multiplier is 3.
use graphile_worker::{JobSpec, INFRASTRUCTURE_RESILIENT_FLAG};
let job = utils
.add_job(
LongRunningJob { id: 1 },
JobSpec::builder()
.flags(vec![INFRASTRUCTURE_RESILIENT_FLAG.to_string()])
.build(),
)
.await?;
You can configure a different flag list:
use graphile_worker::WorkerRecoveryConfig;
let recovery = WorkerRecoveryConfig::default()
.resilient_job_flags(vec!["custom_resilient".to_string()]);
A configured flag must be present and truthy on the job for the extended threshold to apply.
Recovery Hooks
Recovery hooks let your application decide what to do with each job recovered from a crashed worker. The hook receives the job, the recovering worker ID, the previous worker ID, and the interruption reason.
use graphile_worker::{
FailureReason, HookRegistry, JobRecovery, JobRecoveryResult, SweepStaleWorkersOptions,
WorkerUtils,
};
use std::sync::Arc;
let mut hooks = HookRegistry::new();
hooks.on(JobRecovery, |ctx| async move {
if ctx.reason == FailureReason::WorkerCrashed {
JobRecoveryResult::Default
} else {
JobRecoveryResult::Skip
}
});
let utils = WorkerUtils::new(database, "graphile_worker".to_string())
.with_hooks(Arc::new(hooks));
let result = utils
.sweep_stale_workers(SweepStaleWorkersOptions::default())
.await?;
The hook result controls recovery for that job:
| Result | Effect |
|---|---|
JobRecoveryResult::Default | Unlock the job, decrement attempts, and delay it by recovery_delay. |
JobRecoveryResult::Reschedule { run_at, attempts } | Unlock the job, set run_at, and optionally replace attempts. |
JobRecoveryResult::FailWithBackoff | Fail the job with the normal retry backoff and WorkerCrashed as the error. |
JobRecoveryResult::Skip | Leave the job locked and do not count it as recovered. |
After a hook handles a job with any result except Skip, the normal job
interrupted hook is emitted with FailureReason::WorkerCrashed. After a sweep
finishes, worker recovery hooks are emitted with the recovered worker IDs and
the recovered job count.
Lifecycle Hooks
Lifecycle hooks let you observe worker activity or intercept specific points in the job lifecycle. They are registered through plugins and run inside the worker process that owns the hook registry.
Use hooks for logging, metrics, validation, schedule-time payload adjustment, and local queue diagnostics. Keep hook handlers fast and focused: they run on the worker path they observe or intercept.
Registering Hooks
A plugin implements Plugin and adds handlers to the HookRegistry:
use graphile_worker::{HookRegistry, JobComplete, Plugin, WorkerStart};
struct LoggingPlugin;
impl Plugin for LoggingPlugin {
fn register(self, hooks: &mut HookRegistry) {
hooks.on(WorkerStart, async |ctx| {
println!("worker {} started", ctx.worker_id);
});
hooks.on(JobComplete, async |ctx| {
println!("job {} completed in {:?}", ctx.job.id(), ctx.duration);
});
}
}
Add plugins when building the worker:
let worker = graphile_worker::WorkerOptions::default()
.define_job::<ProcessData>()
.pg_pool(pg_pool)
.add_plugin(LoggingPlugin)
.init()
.await?;
Multiple plugins can be added with repeated .add_plugin(...) calls.
Observer vs Interceptor Hooks
Hooks are split into two categories.
Observer hooks return () and are for side effects. They are useful for
metrics, tracing, logging, and state snapshots. When an observer event is
emitted, every registered handler for that event is called.
Interceptor hooks return a result that can affect worker behavior. The common
result type is HookResult:
use graphile_worker::HookResult;
HookResult::Continue
HookResult::Skip
HookResult::Fail("reason".to_string())
Continue allows the operation to proceed. Skip and Fail(...) are terminal
results and stop the interceptor chain.
BeforeJobSchedule uses JobScheduleResult instead. It can continue with a
payload, skip scheduling, or fail scheduling:
use graphile_worker::JobScheduleResult;
JobScheduleResult::Continue(payload)
JobScheduleResult::Skip
JobScheduleResult::Fail("reason".to_string())
Schedule interceptors are chained through the payload. If one plugin returns
JobScheduleResult::Continue(new_payload), the next schedule interceptor
receives that transformed payload. A terminal result stops later schedule
interceptors.
Worker Hooks
Worker hooks observe the worker process itself.
| Hook | Context highlights | Type |
|---|---|---|
WorkerInit | database, schema, concurrency | observer |
WorkerStart | database, worker_id, extensions | observer |
WorkerShutdown | database, worker_id, reason | observer |
WorkerRecovered | worker_id, dead_worker_ids, jobs_recovered | observer |
Example:
use graphile_worker::{HookRegistry, Plugin, WorkerShutdown, WorkerStart};
struct WorkerLogPlugin;
impl Plugin for WorkerLogPlugin {
fn register(self, hooks: &mut HookRegistry) {
hooks.on(WorkerStart, async |ctx| {
println!("worker {} started", ctx.worker_id);
});
hooks.on(WorkerShutdown, async |ctx| {
println!("worker {} stopped: {:?}", ctx.worker_id, ctx.reason);
});
}
}
Job Run Hooks
Run hooks observe or intercept jobs as they are fetched and executed.
| Hook | Context highlights | Type |
|---|---|---|
JobFetch | job, worker_id | observer |
BeforeJobRun | job, worker_id, payload | interceptor |
JobStart | job, worker_id | observer |
AfterJobRun | job, worker_id, result, duration | interceptor |
JobComplete | job, worker_id, duration | observer |
JobFail | job, worker_id, error, will_retry | observer |
JobPermanentlyFail | job, worker_id, error | observer |
JobInterrupted | job, worker_id, reason | observer |
BeforeJobRun can prevent a task handler from running:
use graphile_worker::{BeforeJobRun, HookRegistry, HookResult, Plugin};
struct ValidationPlugin;
impl Plugin for ValidationPlugin {
fn register(self, hooks: &mut HookRegistry) {
hooks.on(BeforeJobRun, async |ctx| {
let should_skip = ctx
.payload
.get("skip")
.and_then(|value| value.as_bool())
.unwrap_or(false);
if should_skip {
return HookResult::Skip;
}
let should_fail = ctx
.payload
.get("force_fail")
.and_then(|value| value.as_bool())
.unwrap_or(false);
if should_fail {
return HookResult::Fail("forced failure by validation hook".into());
}
HookResult::Continue
});
}
}
When a BeforeJobRun hook skips a job, the task handler is not called. When it
fails a job, the task handler is also not called and the normal failure path is
used.
AfterJobRun sees the task result and duration. It also returns HookResult,
so it can leave the result alone with Continue, mark the job skipped with
Skip, or fail it with Fail(...).
Schedule Hooks
BeforeJobSchedule intercepts jobs before they are persisted. It receives the
task identifier, JSON payload, and JobSpec.
Use it to reject, validate, or transform scheduled jobs:
use graphile_worker::{BeforeJobSchedule, HookRegistry, JobScheduleResult, Plugin};
struct SchedulePolicyPlugin;
impl Plugin for SchedulePolicyPlugin {
fn register(self, hooks: &mut HookRegistry) {
hooks.on(BeforeJobSchedule, async |ctx| {
if ctx.identifier == "blocked_task" {
return JobScheduleResult::Skip;
}
let mut payload = ctx.payload.clone();
if let Some(object) = payload.as_object_mut() {
object.insert("checked_by_hook".into(), serde_json::json!(true));
}
JobScheduleResult::Continue(payload)
});
}
}
If a schedule hook returns Skip, no job is inserted. If it returns
Fail(...), scheduling returns an error. If it returns Continue(payload),
that payload is used for the job and passed to any later schedule interceptor.
Cron scheduling also has observer hooks:
| Hook | Context highlights | Type |
|---|---|---|
CronTick | timestamp, crontabs | observer |
CronJobScheduled | crontab, scheduled_at | observer |
BeforeJobSchedule | identifier, payload, spec | interceptor |
Local Queue Hooks
Local queue hooks observe the in-process queue used by a worker.
| Hook | Context highlights | Type |
|---|---|---|
LocalQueueInit | worker_id | observer |
LocalQueueSetMode | worker_id, old_mode, new_mode | observer |
LocalQueueGetJobsComplete | worker_id, jobs_count | observer |
LocalQueueReturnJobs | worker_id, jobs_count | observer |
LocalQueueRefetchDelayStart | worker_id, duration, threshold, abort_threshold | observer |
LocalQueueRefetchDelayAbort | worker_id, count, abort_threshold | observer |
LocalQueueRefetchDelayExpired | worker_id | observer |
The local queue mode values are Starting, Polling, Waiting,
TtlExpired, and Released.
Example:
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use graphile_worker::{
HookRegistry, LocalQueueGetJobsComplete, LocalQueueSetMode, Plugin,
};
#[derive(Clone)]
struct QueueMetricsPlugin {
max_fetched: Arc<AtomicUsize>,
}
impl Plugin for QueueMetricsPlugin {
fn register(self, hooks: &mut HookRegistry) {
let max_fetched = self.max_fetched.clone();
hooks.on(LocalQueueGetJobsComplete, move |ctx| {
let max_fetched = max_fetched.clone();
async move {
max_fetched.fetch_max(ctx.jobs_count, Ordering::Relaxed);
}
});
hooks.on(LocalQueueSetMode, async |ctx| {
println!(
"local queue mode changed: {:?} -> {:?}",
ctx.old_mode,
ctx.new_mode
);
});
}
}
Recovery Hook
JobRecovery is an interceptor for recovered jobs. Its context includes the
job, the current worker_id, the previous_worker_id, and the recovery
reason.
It returns JobRecoveryResult:
use graphile_worker::JobRecoveryResult;
JobRecoveryResult::Default
JobRecoveryResult::Reschedule {
run_at,
attempts: Some(3),
}
JobRecoveryResult::FailWithBackoff
JobRecoveryResult::Skip
Use Default to keep the worker's standard recovery behavior. Use the other
variants only when the recovered job needs explicit policy.
Practical Example
This plugin combines observer hooks for metrics with an interceptor for payload-based validation:
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use graphile_worker::{
BeforeJobRun, HookRegistry, HookResult, JobComplete, JobFail, JobStart, Plugin,
};
#[derive(Default)]
struct Counters {
started: AtomicU64,
completed: AtomicU64,
failed: AtomicU64,
}
struct MetricsAndValidationPlugin {
counters: Arc<Counters>,
}
impl Plugin for MetricsAndValidationPlugin {
fn register(self, hooks: &mut HookRegistry) {
let counters = self.counters.clone();
hooks.on(JobStart, move |_ctx| {
let counters = counters.clone();
async move {
counters.started.fetch_add(1, Ordering::Relaxed);
}
});
let counters = self.counters.clone();
hooks.on(JobComplete, move |_ctx| {
let counters = counters.clone();
async move {
counters.completed.fetch_add(1, Ordering::Relaxed);
}
});
let counters = self.counters.clone();
hooks.on(JobFail, move |_ctx| {
let counters = counters.clone();
async move {
counters.failed.fetch_add(1, Ordering::Relaxed);
}
});
hooks.on(BeforeJobRun, async |ctx| {
if ctx
.payload
.get("skip")
.and_then(|value| value.as_bool())
.unwrap_or(false)
{
return HookResult::Skip;
}
HookResult::Continue
});
}
}
See the repository examples in examples/hooks.rs and examples/hooks/*.rs
for a runnable version with logging, metrics, validation, and sample jobs.
Job Management
WorkerUtils is the operational API for managing Graphile Worker jobs from Rust
code. Use it to schedule work, cancel keyed jobs, complete or fail selected
jobs, reschedule jobs, clean up old metadata, run migrations, and recover locks
left behind by stopped workers.
Most applications get a WorkerUtils instance from an initialized worker:
let worker = graphile_worker::WorkerOptions::default()
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
let utils = worker.create_utils();
Operator tools that do not run a worker can construct utilities directly from a database handle and schema:
use graphile_worker::WorkerUtils;
let utils = WorkerUtils::new(database, "graphile_worker");
utils.migrate().await?;
Scheduling Jobs
Use add_job when the task type is known at compile time. The task handler's
IDENTIFIER is used as the job identifier, and the payload is serialized to
JSON.
use graphile_worker::{IntoTaskHandlerResult, JobSpec, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SendEmail {
to: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
Ok::<(), String>(())
}
}
let job = utils
.add_job(
SendEmail {
to: "user@example.com".to_string(),
},
JobSpec::default(),
)
.await?;
Use add_raw_job when the identifier is dynamic or when building an operator
tool that schedules jobs without linking the task type.
use graphile_worker::JobSpec;
use serde_json::json;
let job = utils
.add_raw_job(
"send_email",
json!({ "to": "user@example.com" }),
JobSpec::default(),
)
.await?;
WorkerUtils also supports batch insertion:
let spec = JobSpec::default();
let jobs = vec![
(SendEmail { to: "a@example.com".to_string() }, &spec),
(SendEmail { to: "b@example.com".to_string() }, &spec),
];
let added = utils.add_jobs(&jobs).await?;
For mixed identifiers, use add_raw_jobs with RawJobSpec values:
use graphile_worker::{JobSpec, RawJobSpec};
use serde_json::json;
let added = utils
.add_raw_jobs(&[
RawJobSpec {
identifier: "send_email".to_string(),
payload: json!({ "to": "a@example.com" }),
spec: JobSpec::default(),
},
RawJobSpec {
identifier: "generate_report".to_string(),
payload: json!({ "report_id": 42 }),
spec: JobSpec::default(),
},
])
.await?;
Empty add_jobs and add_raw_jobs calls return an empty vector without writing
to the database. For large batches, the utility attempts to run ANALYZE
on the jobs table after inserting at least 10,000 jobs.
Job Keys
Set JobSpec::job_key to deduplicate work. Adding another job with the same key
updates the existing job instead of creating another row. In the tested behavior,
the later payload replaces the earlier payload and the job revision increments.
use graphile_worker::{JobKeyMode, JobSpec};
utils
.add_job(
SendEmail {
to: "user@example.com".to_string(),
},
JobSpec {
job_key: Some("welcome-email:user-123".to_string()),
job_key_mode: Some(JobKeyMode::Replace),
..Default::default()
},
)
.await?;
JobKeyMode::PreserveRunAt keeps the existing scheduled time when the keyed job
is updated. JobKeyMode::Replace replaces it with the new run_at.
JobKeyMode::UnsafeDedupe is supported for individual add_job and
add_raw_job calls, but batch insertion rejects it. If a batch contains any
PreserveRunAt job key mode, the batch insert applies preserve-run-at behavior
uniformly.
Cancel a keyed job with remove_job:
utils.remove_job("welcome-email:user-123").await?;
Batch Task Jobs
For handlers that implement BatchTaskHandler, add_batch_job stores the
payload as a JSON array in a single job row.
let job = utils
.add_batch_job(
vec![
SendEmail { to: "a@example.com".to_string() },
SendEmail { to: "b@example.com".to_string() },
],
JobSpec::default(),
)
.await?;
The payload list must contain at least one item. If a before_job_schedule hook
is installed, it must return a JSON array for batch jobs and must not return an
empty array.
Completing, Failing, and Rescheduling Jobs
Administrative job actions take job ids and return the jobs that were changed.
Locked jobs are left untouched by complete_jobs, permanently_fail_jobs, and
reschedule_jobs.
let completed = utils.complete_jobs(&[101, 102]).await?;
let failed = utils
.permanently_fail_jobs(&[103], "invalid payload")
.await?;
Permanent failure sets last_error to the supplied reason and sets attempts
to max_attempts.
Use RescheduleJobOptions to update only the fields you need:
use chrono::Utc;
use graphile_worker::worker_utils::types::RescheduleJobOptions;
let rescheduled = utils
.reschedule_jobs(
&[104, 105],
RescheduleJobOptions {
run_at: Some(Utc::now() + chrono::Duration::minutes(5)),
priority: Some(10),
attempts: Some(0),
max_attempts: Some(25),
},
)
.await?;
If run_at is not supplied, the database function schedules matching jobs to
run immediately. priority, attempts, and max_attempts are optional and are
left unchanged when omitted.
Cleanup
cleanup runs one or more maintenance tasks:
use graphile_worker::worker_utils::types::CleanupTask;
utils
.cleanup(&[
CleanupTask::GcTaskIdentifiers,
CleanupTask::GcJobQueues,
CleanupTask::DeletePermanentlyFailedJobs,
])
.await?;
The available cleanup tasks are:
GcTaskIdentifiers: removes task identifiers no longer referenced by jobs.GcJobQueues: removes queue records no longer referenced by jobs.DeletePermanentlyFailedJobs: deletes unlocked jobs whose attempts have reachedmax_attempts.
When utilities come from a worker, GcTaskIdentifiers preserves task
identifiers known by that worker and refreshes its task details afterward. This
keeps horizontally scaled workers able to pick up newly scheduled jobs for
registered task handlers after cleanup.
Worker Recovery Operations
For deployments using worker recovery, WorkerUtils can inspect heartbeat
workers and run recovery sweeps manually.
use std::time::Duration;
let workers = utils
.list_active_workers(Duration::from_secs(300))
.await?;
Run a stale-worker sweep to recover jobs locked by workers that stopped heartbeating, and jobs or queues locked by worker ids that are no longer registered:
use graphile_worker::SweepStaleWorkersOptions;
use std::time::Duration;
let result = utils
.sweep_stale_workers(SweepStaleWorkersOptions {
sweep_threshold: Some(Duration::from_secs(300)),
recovery_delay: Some(Duration::from_secs(30)),
dry_run: true,
..Default::default()
})
.await?;
Use dry_run: true for inspection before making changes. When you need the
sweep to use a specific recovery configuration, call
sweep_stale_workers_with_config.
force_unlock_workers is a direct unlock tool for known worker ids:
utils
.force_unlock_workers(&["graphile_worker_deadbeef"])
.await?;
It clears locks for jobs and queues held by those workers while leaving other workers' locks alone.
Migrations
Operator processes can run schema migrations through utilities:
utils.migrate().await?;
This is the same migration path exposed by the command-line tool.
Command-Line Operations
The graphile-worker binary wraps the same operational concepts for shell
workflows. It connects with --database-url or DATABASE_URL and defaults to
the graphile_worker schema.
graphile-worker --database-url postgres://postgres:postgres@localhost/postgres migrate
DATABASE_URL=postgres://postgres:postgres@localhost/postgres graphile-worker add send_email --payload '{"to":"user@example.com"}'
graphile-worker list --state ready
graphile-worker complete 123 124
graphile-worker fail 125 --reason "invalid payload"
graphile-worker reschedule 126 --run-at 2026-01-02T03:04:05Z
graphile-worker remove cli-job-key
graphile-worker cleanup
graphile-worker force-unlock graphile_worker_deadbeef
graphile-worker sweep-stale-workers --sweep-threshold 5m --recovery-delay 30s
graphile-worker sweep-stale-workers --dry-run
Use the Rust API from application code and purpose-built operator services. Use the CLI for ad hoc inspection, maintenance, and recovery tasks.
Operations
This section covers the surfaces you use after Graphile Worker RS is running in an application: schema migrations, the command-line tool, the admin UI, worker shutdown, recovery, and production readiness checks.
Use these pages when you are preparing a deployment or responding to a queue incident:
- CLI for adding, listing, completing, failing, rescheduling, removing, cleaning up, unlocking, and inspecting jobs.
- Admin UI for browser-based queue inspection and controlled maintenance actions.
- Migrations for installing and updating the PostgreSQL schema.
- Deployment for running workers outside local development.
- Observability for production visibility.
Operational surfaces
Graphile Worker RS exposes three main operational entry points.
Worker runtime
The worker process owns job execution. Production configuration normally starts with a PostgreSQL pool, a schema name, registered task handlers, concurrency, and shutdown behavior:
use graphile_worker::{WorkerOptions, WorkerShutdownConfig};
use std::time::Duration;
let shutdown = WorkerShutdownConfig::default()
.listen_os_shutdown_signals(true)
.grace_period(Duration::from_secs(5))
.interrupted_job_retry_delay(Duration::from_secs(30));
let worker = WorkerOptions::default()
.concurrency(5)
.schema("graphile_worker")
.worker_shutdown(shutdown)
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
worker.run().await?;
If your host application already owns shutdown, disable the built-in OS signal listeners and pass the host shutdown future to the worker. The worker still handles graceful draining, local queue release, batcher flushes, and configured grace-period behavior for in-flight jobs.
Command-line tool
The installed binary is graphile-worker. It connects through
--database-url or DATABASE_URL and uses the graphile_worker schema by
default.
graphile-worker --database-url postgres://postgres:postgres@localhost/postgres migrate
DATABASE_URL=postgres://postgres:postgres@localhost/postgres graphile-worker add send_email --payload '{"to":"user@example.com"}'
graphile-worker list --state ready
graphile-worker show 123
graphile-worker complete 123 124
graphile-worker fail 125 --reason "invalid payload"
graphile-worker reschedule 126 --run-at 2026-01-02T03:04:05Z
graphile-worker remove cli-job-key
graphile-worker cleanup
graphile-worker force-unlock graphile_worker_deadbeef
graphile-worker stats
graphile-worker queues
graphile-worker workers
For stale worker recovery, dry-run first so you can see which workers would be recovered:
graphile-worker sweep-stale-workers --dry-run
graphile-worker sweep-stale-workers --sweep-threshold 5m --recovery-delay 30s
Admin UI
The native admin UI is an Axum server. It serves the browser UI and protected API routes for:
- session metadata
- queue overview, stats, queues, locked workers, and active workers
- job listing and single-job inspection
- adding raw jobs
- completing, permanently failing, running now, and rescheduling jobs
- removing a job by key
- maintenance actions: migrate, cleanup, force unlock, and sweep stale workers
When embedding the admin UI directly, AdminServerConfig defaults to
127.0.0.1:4000, uses the graphile_worker schema, enables write actions, and
uses Basic auth with a random password for the admin user. The
graphile-worker admin CLI command overrides the listen default to
127.0.0.1:5678. Authentication modes available in the server configuration are
Basic, Bearer token, custom header token, and no auth. No-auth mode is rejected
on non-loopback listen addresses.
Run write-capable admin UI instances behind an access-controlled boundary. The
server adds no-store cache headers for dynamic routes, caches static assets for
one hour, sets X-Content-Type-Options: nosniff, denies framing, uses
Referrer-Policy: no-referrer, and applies a Content Security Policy. API
write methods also require the admin CSRF token.
Use read-only mode for shared inspection surfaces. In read-only mode, write
routes return 403 instead of mutating jobs or running maintenance.
Migrations
Migrations install and update the Graphile Worker PostgreSQL schema. The migration runner:
- creates the schema and
migrationstable if they are missing - checks that PostgreSQL is version 12 or newer
- runs pending migrations in order inside transactions
- records each migration id and whether it is breaking
- rejects a database revision that includes a breaking migration newer than the current crate supports
- warns, but continues, when the database has a newer non-breaking revision than the current crate supports
Run migrations before starting workers after deploys that may include schema changes:
graphile-worker migrate
Migration 11 has a specific safety check: if locked jobs are present and
PostgreSQL returns error code 22012, migration fails with guidance to shut
workers down cleanly and unlock locked jobs and queues before retrying.
Recovery and locked jobs
Worker recovery is opt-in. When enabled, workers register heartbeat rows in
PostgreSQL and refresh last_heartbeat_at on a configured interval. A sweeper
can recover jobs held by workers that stopped heartbeating, and jobs or queues
locked by worker ids that are no longer registered.
use graphile_worker::{WorkerOptions, WorkerRecoveryConfig};
use std::time::Duration;
let recovery = WorkerRecoveryConfig::default()
.enabled(true)
.heartbeat_interval(Duration::from_secs(30))
.sweep_interval(Duration::from_secs(60))
.sweep_threshold(Duration::from_secs(300))
.recovery_delay(Duration::from_secs(30));
let worker = WorkerOptions::default()
.worker_recovery(recovery)
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
Recovered jobs are unlocked, their attempt count is decremented back, their
queue lock is released, and run_at is delayed by the configured recovery
delay. Sweeps use a transaction-scoped PostgreSQL advisory lock so only one
sweeper performs recovery at a time.
Manual recovery is available even when automatic recovery is disabled:
use graphile_worker::SweepStaleWorkersOptions;
use std::time::Duration;
let result = worker
.create_utils()
.sweep_stale_workers(SweepStaleWorkersOptions {
sweep_threshold: Some(Duration::from_secs(300)),
recovery_delay: Some(Duration::from_secs(30)),
dry_run: true,
})
.await?;
println!("Would recover workers: {:?}", result.worker_ids);
Use force-unlock only when you have identified worker ids that should no
longer own locks:
graphile-worker workers
graphile-worker force-unlock graphile_worker_deadbeef
Local queue considerations
The local queue reduces database round trips by batch-fetching jobs and caching them in the worker process. It has operational implications:
- unclaimed jobs are returned to PostgreSQL on shutdown or when the local queue TTL expires
- refetch delay can reduce thundering-herd behavior when queues are empty
- cache size and refill threshold should be chosen with worker concurrency and database load in mind
use graphile_worker::{LocalQueueConfig, RefetchDelayConfig, WorkerOptions};
use std::time::Duration;
let worker = WorkerOptions::default()
.local_queue(
LocalQueueConfig::default()
.with_size(100)
.with_ttl(Duration::from_secs(300))
.with_refetch_delay(
RefetchDelayConfig::default()
.with_duration(Duration::from_millis(100))
.with_threshold(10),
),
)
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
Production readiness checklist
Before sending production traffic to workers, verify the following:
- PostgreSQL is version 12 or newer.
- The database URL is configured through
DATABASE_URLor an equivalent application secret. - Migrations are run before workers start processing jobs.
- All deployed workers agree on the Graphile Worker schema name.
- Worker task handlers are registered for every task identifier you enqueue.
- Concurrency, PostgreSQL pool size, and local queue size are sized together.
- Shutdown behavior is wired into your process manager or host application.
- Recovery is enabled when the deployment model can leave jobs locked after crashes, network partitions, forced aborts, or orchestrator shutdowns.
- Admin UI access uses Basic, Bearer, or header auth when exposed beyond loopback.
- Admin UI read-only mode is used for inspection-only deployments.
- Maintenance commands such as
cleanup,force-unlock, andsweep-stale-workersare restricted to operators who can safely mutate queue state. - Stale-worker sweeps are dry-run before recovery during incident response.
- Cleanup policy is understood before permanently failed jobs, task identifiers, or job queues are garbage-collected.
- Observability covers worker startup, job failures, locked workers, queue backlog, and migration failures.
CLI
The graphile-worker binary provides operational access to a Graphile Worker
PostgreSQL database. It can run migrations, add and inspect jobs, mutate job
state, run maintenance tasks, print queue and worker summaries, and serve the
embedded admin UI.
Most commands mutate the configured schema directly. Use the same database URL and schema that your workers use.
Connection Options
Every command accepts these global options:
graphile-worker \
--database-url postgres://postgres:postgres@localhost/postgres \
--schema graphile_worker \
--max-connections 5 \
--json \
stats
| Option | Default | Description |
|---|---|---|
--database-url | DATABASE_URL | PostgreSQL connection URL. The CLI exits if neither is set. |
--schema | GRAPHILE_WORKER_SCHEMA, then graphile_worker | Graphile Worker schema name. |
--max-connections | 5 | Maximum PostgreSQL connections used by the CLI pool. |
--json | disabled | Print machine-readable JSON where the command supports it. |
Migrations
Run migrations before using a fresh database:
graphile-worker --database-url "$DATABASE_URL" migrate
With JSON output:
graphile-worker --database-url "$DATABASE_URL" --json migrate
The JSON form prints:
{
"migrated": true
}
Add Jobs
Add a job by task identifier. If no payload is provided, the payload defaults to an empty JSON object.
graphile-worker add send_email
Pass JSON directly:
graphile-worker add send_email \
--payload '{"to":"user@example.com"}'
Read JSON from a file:
graphile-worker add send_email \
--payload-file payload.json
Set common scheduling and routing fields:
graphile-worker add send_email \
--payload '{"to":"user@example.com"}' \
--queue emails \
--run-at 2026-01-02T03:04:05Z \
--max-attempts 5 \
--priority 10 \
--flag transactional
--run-at accepts an RFC 3339 timestamp or now. Lower priority values run
sooner.
Use job keys for deduplication or replacement:
graphile-worker add send_email \
--key user-123-welcome \
--job-key-mode replace
--job-key-mode requires --key. Supported modes are:
| Mode | CLI value |
|---|---|
| Replace | replace |
| Preserve run time | preserve-run-at |
| Unsafe dedupe | unsafe-dedupe |
--payload and --payload-file cannot be used together. Payload input must be
valid JSON.
Inspect Jobs
List jobs:
graphile-worker list
Filter by state, task, queue, and pagination:
graphile-worker list \
--state ready \
--identifier send_email \
--queue emails \
--limit 25 \
--offset 0
Supported states are all, ready, scheduled, locked, and failed.
--limit and --offset must be greater than or equal to zero.
Show one job by id:
graphile-worker show 123
Human-readable list output is tab-separated. Human-readable show output
includes the job payload pretty-printed as JSON. Add --json to either command
for structured output.
Mutate Jobs
Complete one or more jobs:
graphile-worker complete 123 124
Permanently fail jobs:
graphile-worker fail 125 --reason "invalid payload"
If --reason is omitted, the CLI uses Manually marked as failed.
Reschedule jobs or update retry metadata:
graphile-worker reschedule 126 --run-at 2026-01-02T03:04:05Z
graphile-worker reschedule 127 --now
graphile-worker reschedule 128 --priority 7
graphile-worker reschedule 129 --attempts 1 --max-attempts 5
reschedule requires at least one of --now, --run-at, --priority,
--attempts, or --max-attempts.
Remove a job by job key:
graphile-worker remove user-123-welcome
Overview Commands
Print queue-wide job counts:
graphile-worker stats
List queues and queue lock state:
graphile-worker queues
List worker ids that currently hold job or queue locks:
graphile-worker workers
These commands also support --json.
Maintenance
Run cleanup tasks:
graphile-worker cleanup
When no task is passed, all cleanup tasks run. To run selected tasks:
graphile-worker cleanup delete-permanently-failed-jobs
graphile-worker cleanup gc-task-identifiers gc-job-queues
Force unlock jobs and queues locked by worker ids:
graphile-worker force-unlock graphile_worker_deadbeef
graphile-worker force-unlock worker-a worker-b
Recover stale workers and orphan locks:
graphile-worker sweep-stale-workers
graphile-worker sweep-stale-workers --sweep-threshold 5m --recovery-delay 30s
--sweep-threshold is the time since last heartbeat before a worker is deemed
inactive. --recovery-delay is the delay before recovered jobs are eligible to
run again. Durations may be plain seconds or use second, minute, or hour units,
for example 300, 30s, 5m, or 2h.
Use dry-run mode to see which stale workers would be recovered without unlocking or rescheduling jobs:
graphile-worker sweep-stale-workers --dry-run
Admin UI
Serve the embedded admin UI and JSON management API:
graphile-worker admin
By default it listens on 127.0.0.1:5678 and uses HTTP Basic auth with username
admin. If no password is provided, the CLI generates one and prints it at
startup.
Useful options:
graphile-worker admin \
--listen 127.0.0.1:5678 \
--auth basic \
--username admin \
--password "$GRAPHILE_WORKER_ADMIN_PASSWORD"
Authentication modes are basic, bearer, header, and none.
| Mode | Options and environment |
|---|---|
basic | --username, --password, or GRAPHILE_WORKER_ADMIN_PASSWORD |
bearer | --bearer-token or GRAPHILE_WORKER_ADMIN_BEARER_TOKEN |
header | --header-name, --header-token, or GRAPHILE_WORKER_ADMIN_HEADER_TOKEN |
none | Allowed only when --listen uses a loopback address |
For read-only operation, disable mutating admin actions:
graphile-worker admin --read-only
Operational Caveats
- The CLI opens a PostgreSQL connection pool for every command, including the
long-running
adminserver. Tune--max-connectionsfor operational environments. --jsonis intended for automation. Human-readable tables are tab-separated and may be easier to inspect in a terminal.complete,fail,reschedule,remove,cleanup,force-unlock,sweep-stale-workers, andmigratechange database state.force-unlockunlocks by worker id. Prefersweep-stale-workers --dry-runfirst when recovering from stale workers, then run without--dry-runafter checking the listed worker ids.--auth nonefor the admin UI is rejected unless the listen address is loopback.
Admin UI
Graphile Worker RS includes an embedded admin UI for inspecting queues, jobs, and workers, plus a JSON API for the same operations. The implementation is split across three crates:
graphile_worker_admin_apidefines shared request and response types, and SQLx-backed read queries when built with thesqlxfeature.graphile_worker_admin_uiserves the native Axum application, the HTML page, embedded CSS, JavaScript, WASM, and JSON API routes.graphile_worker_admin_ui_clientcontains the WASM client that runs in the browser.
The CLI uses these crates to expose the graphile-worker admin command.
Starting the Server
The admin command starts a native HTTP server and connects it to the same
Postgres schema and WorkerUtils used by the rest of the CLI.
graphile-worker admin
By default it listens on 127.0.0.1:5678, uses HTTP Basic authentication with
username admin, and generates a password when none is provided.
Useful options include:
graphile-worker admin --listen 127.0.0.1:5678
graphile-worker admin --auth basic --username admin
graphile-worker admin --auth bearer
graphile-worker admin --auth header --header-name x-graphile-worker-admin-token
graphile-worker admin --read-only
The CLI also reads these environment variables for configured secrets:
GRAPHILE_WORKER_ADMIN_PASSWORD="<password>" graphile-worker admin --auth basic
GRAPHILE_WORKER_ADMIN_BEARER_TOKEN="<token>" graphile-worker admin --auth bearer
GRAPHILE_WORKER_ADMIN_HEADER_TOKEN="<token>" graphile-worker admin --auth header
When a Basic password, bearer token, or header token is generated, the CLI prints it at startup. Configured secrets are not printed.
Native Embedding
Applications can build the Axum server directly with
graphile_worker_admin_ui::AdminServerConfig and graphile_worker_admin_ui::serve.
use graphile_worker::{Schema, WorkerUtils};
use graphile_worker_admin_ui::{AdminAuthConfig, AdminServerConfig};
use sqlx::PgPool;
async fn serve_admin(pool: PgPool, utils: WorkerUtils) -> anyhow::Result<()> {
let config = AdminServerConfig::builder(pool, utils)
.schema(Schema::default())
.schema_name("graphile_worker")
.listen_addr("127.0.0.1:5678".parse()?)
.auth(AdminAuthConfig::basic_with_random_password("admin"))
.read_only(false)
.build()?;
graphile_worker_admin_ui::serve(config).await?;
Ok(())
}
When embedding the admin UI directly, the config builder defaults to:
- schema
Schema::default() - schema name
graphile_worker - listen address
127.0.0.1:4000 - generated Basic auth for username
admin - read-write mode
Authentication
The server supports four auth modes:
| Mode | Behavior |
|---|---|
basic | Requires HTTP Basic credentials. This is the default CLI mode. |
bearer | Requires Authorization: Bearer <token>. |
header | Requires a configured header name whose value matches the token. |
none | Requires no auth, but is only allowed on loopback listen addresses. |
Both the CLI auth builder and the native config validation reject unauthenticated servers bound to non-loopback addresses.
For Basic auth, the server also applies page authentication to the browser page
itself. API routes are authenticated in every mode except none.
CSRF and Security Headers
The server generates a CSRF token when it builds application state. Every
mutating API request must include that token in the
x-graphile-worker-admin-csrf header. The browser client reads the header name
and token from the rendered page/session data and sends it on writes.
The server adds security headers to all responses:
Cache-Control: no-storefor pages and API responsesCache-Control: public, max-age=3600for/assets/*and/favicon.icoX-Content-Type-Options: nosniffX-Frame-Options: DENYReferrer-Policy: no-referrer- a content security policy limited to same-origin assets and API calls
Routes
The native server exposes the browser page, embedded assets, and JSON API from one Axum router.
| Route | Method | Purpose |
|---|---|---|
/ | GET | Render the admin page. |
/assets/admin.css | GET | Embedded stylesheet. |
/assets/admin.js | GET | Embedded JavaScript entrypoint. |
/assets/admin_ui.js | GET | WASM bindgen JavaScript. |
/assets/admin_ui_bg.wasm | GET | WASM client module. |
/favicon.ico | GET | Embedded SVG favicon. |
/api/session | GET | Return schema, read-only state, CSRF header name, and public auth summary. |
/api/overview | GET | Return job counts, queues, locked workers, and active workers. |
/api/jobs | GET | List jobs. |
/api/jobs | POST | Add a job. |
/api/jobs/{id} | GET | Fetch one listed job. |
/api/jobs/action | POST | Complete, fail, run now, or reschedule selected jobs. |
/api/jobs/remove-by-key | POST | Remove a job by key. |
/api/maintenance | POST | Run maintenance operations. |
Job Visibility
GET /api/jobs accepts these query parameters:
state:all,ready,scheduled,locked, orfailedidentifier: task identifier filterqueue: queue name filtersearch: text search filterlimit: result limit, capped by the native route at 500offset: result offset
The response contains ListedJob rows with job metadata such as id, task
identifier, queue name, payload, priority, run time, attempts, last error, key,
lock owner, revision, flags, and availability.
curl -u "admin:${GRAPHILE_WORKER_ADMIN_PASSWORD}" \
"http://127.0.0.1:5678/api/jobs?state=failed&limit=50"
Job Mutations
Mutating job routes are disabled when the server runs with --read-only or
AdminServerConfig::read_only(true).
Adding a job uses the shared AddJobRequest contract:
{
"identifier": "send_email",
"payload": { "user_id": 123 },
"queue": "mailers",
"max_attempts": 25,
"key": "email:123",
"job_key_mode": "replace",
"priority": 0,
"flags": ["transactional"]
}
Supported job key modes are replace, preserve-run-at, and
unsafe-dedupe. When job_key_mode is set, key is required.
POST /api/jobs/action accepts an action and job ids:
{
"action": "reschedule",
"ids": [42, 43],
"run_at": "2026-01-01T00:00:00Z",
"priority": 10,
"attempts": 0,
"max_attempts": 25
}
Supported actions are:
completefailrun-nowreschedule
fail uses the provided reason when present and non-empty. Otherwise it uses
the admin UI default reason. reschedule requires at least one of run_at,
priority, attempts, or max_attempts.
Jobs can also be removed by key:
{
"key": "email:123"
}
Maintenance
POST /api/maintenance exposes selected WorkerUtils maintenance operations.
It is also disabled in read-only mode.
Supported actions are:
migratecleanupforce-unlocksweep-stale-workers
Cleanup can run all cleanup tasks by omitting cleanup_tasks, or a selected
subset:
{
"action": "cleanup",
"cleanup_tasks": [
"delete-permanently-failed-jobs",
"gc-task-identifiers",
"gc-job-queues"
]
}
Force unlock requires at least one worker id:
{
"action": "force-unlock",
"worker_ids": ["worker-1"]
}
Stale worker sweeping accepts optional thresholds in seconds and supports dry runs:
{
"action": "sweep-stale-workers",
"dry_run": true,
"sweep_threshold_secs": 300,
"recovery_delay_secs": 60
}
Assets and Client Behavior
The native crate embeds the admin stylesheet, JavaScript, WASM bindgen output,
WASM module, and SVG favicon into the server binary. The browser client uses
same-origin requests, sends Accept: application/json, includes credentials,
and adds the CSRF header on writes.
For bearer and header auth modes, the browser client sends the token on API requests after it has been entered in the UI. For Basic auth, the browser uses same-origin credentials from the HTTP auth challenge.
Migrations
Graphile Worker RS stores its database objects in a PostgreSQL schema and tracks
the installed schema revision in a migrations table inside that schema. The
graphile_worker_migrations crate owns this setup and exposes the migration
runner used to install or update the schema.
The public entry point is graphile_worker_migrations::migrate(database, schema).
It accepts a database connection/executor value that can be converted into the
crate database type, plus the schema name to manage.
use graphile_worker_migrations::migrate;
migrate(&database, "graphile_worker").await?;
What migrate does
On each run, the migration runner reads:
- the PostgreSQL
server_version_num - the latest row in
<schema>.migrations - the latest row in
<schema>.migrationsmarked as a breaking migration
If <schema>.migrations does not exist, the runner installs the base schema
tracking objects first:
create schema if not exists graphile_worker;
create table if not exists graphile_worker.migrations (
id int primary key,
ts timestamptz default now() not null,
breaking boolean not null default false
);
After that, it runs every packaged migration whose number is greater than the
latest recorded migration id. Each migration runs in its own database
transaction. When the SQL completes, the runner inserts a row into the
migrations table with the migration id and whether that migration is marked as
breaking, then commits the transaction.
Running migrate again after the schema is current is expected to be harmless:
the runner sees the latest recorded revision and skips already-applied
migrations.
Packaged revisions
Migration SQL is bundled in the graphile_worker_migrations crate under
src/sql. At runtime the crate loads those files into the
GRAPHILE_WORKER_MIGRATIONS registry in revision order.
The current package contains revisions 1 through 20. The revisions marked as
breaking in the registry are:
1, 3, 11, 13, 14, 16
The runner supports taking over from an existing Graphile Worker schema as long
as the migrations table accurately records the already-applied revision. For
example, if revision 1 is already present, migrate starts at revision 2.
Startup guidance
Run migrations before starting workers that enqueue, poll, or execute jobs:
use graphile_worker_migrations::migrate;
async fn boot(database: &Database) -> Result<(), Box<dyn std::error::Error>> {
migrate(database, "graphile_worker").await?;
// Start worker processes only after the schema is installed and current.
Ok(())
}
This is especially important around breaking revisions. The runner refuses to continue if the database has a newer breaking migration than the currently running package supports. For example, a database whose migrations table records a future breaking revision cannot safely be used by an older worker binary.
If the database records a future non-breaking revision, the runner logs a warning and continues. That warning means the database schema is newer than the current package knows about, so all running Graphile Worker RS versions should be checked for compatibility.
PostgreSQL version check
The migration package requires PostgreSQL 12 or newer. It checks
current_setting('server_version_num') and returns an incompatible-version error
for versions below 120000.
On a fresh schema install, the PostgreSQL version is checked before creating the schema and migrations table. On existing schemas, the version reported while reading migration state is checked before pending migrations are applied.
Locked jobs during migration 11
Migration 11 has a specific guard for locked jobs. If PostgreSQL returns SQL
state 22012 while applying that migration, the runner returns
MigrateError::LockedJobInMigration11.
Before applying a package version that still needs to run migration 11, stop
workers cleanly and make sure jobs and queues are not left locked. Then rerun
migrate.
Custom schemas
The schema argument controls where the Graphile Worker objects are installed:
migrate(&database, "my_worker_schema").await?;
Packaged migration SQL uses the :GRAPHILE_WORKER_SCHEMA placeholder
internally. The migration executor replaces that placeholder with the escaped
schema name before executing each SQL statement.
Deployment
Graphile Worker RS runs as an application process backed by PostgreSQL. A production deployment should treat the worker like any other stateful database client: size the PostgreSQL pool deliberately, run schema migrations before work starts, handle shutdown signals, and expose enough logs or hooks to understand job flow.
Database and Migrations
Each worker needs a PostgreSQL connection, either from an existing database pool or from a connection URL:
let worker = graphile_worker::WorkerOptions::default()
.database_url(&std::env::var("DATABASE_URL")?)
.schema("graphile_worker")
.define_job::<SendEmail>()
.init()
.await?;
If you already manage your own pool, pass it to the worker instead:
let pg_pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(10)
.connect(&std::env::var("DATABASE_URL")?)
.await?;
let worker = graphile_worker::WorkerOptions::default()
.pg_pool(pg_pool)
.schema("graphile_worker")
.define_job::<SendEmail>()
.init()
.await?;
WorkerOptions::init() runs the Graphile Worker migrations for the configured
schema before registering task handlers and returning a runnable worker. The
default schema is graphile_worker; choose a different schema when you want to
isolate worker tables for an application, environment, or tenant.
The CLI can also run migrations explicitly:
graphile-worker --database-url "$DATABASE_URL" migrate
Pool and Concurrency
concurrency controls how many jobs a worker process can run at the same time.
If you do not set it, the worker defaults to the number of logical CPUs on the
host. Set it explicitly in production so the same artifact behaves consistently
across instance sizes.
let worker = graphile_worker::WorkerOptions::default()
.concurrency(8)
.database_url(&database_url)
.max_pg_conn(20)
.define_job::<SendEmail>()
.init()
.await?;
When the worker creates its own pool from database_url, max_pg_conn sets the
maximum number of database connections. If you pass an existing pool with
pg_pool or database, the pool's own configuration is used and
max_pg_conn is ignored.
Use practical sizing:
- CPU-heavy jobs usually need concurrency near the available CPU capacity.
- I/O-heavy jobs can often run with higher concurrency.
- Total database connections are the sum of all worker replicas plus the rest of your application.
- Keep enough pool capacity for polling, job updates, LISTEN/NOTIFY handling, recovery, cron scheduling, and any database work done inside job handlers.
The worker polls PostgreSQL when notifications are not enough and for scheduled jobs. The default poll interval is one second. Lower intervals can improve responsiveness but increase database load:
let worker = graphile_worker::WorkerOptions::default()
.poll_interval(std::time::Duration::from_millis(500))
.define_job::<SendEmail>()
.database_url(&database_url)
.init()
.await?;
Worker Process
A typical production process creates the worker, then calls run():
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let worker = graphile_worker::WorkerOptions::default()
.concurrency(8)
.database_url(&std::env::var("DATABASE_URL")?)
.schema("graphile_worker")
.define_job::<SendEmail>()
.init()
.await?;
worker.run().await?;
Ok(())
}
Register every task this process is allowed to execute with define_job,
define_batch_job, or define_jobs. To split work across specialized worker
deployments, use job flags and add_forbidden_flag so a process skips jobs that
do not belong on that deployment:
let worker = graphile_worker::WorkerOptions::default()
.add_forbidden_flag("high_memory")
.define_job::<SendEmail>()
.database_url(&database_url)
.init()
.await?;
Jobs with the same queue name run in series for that queue. Use queue names when you need per-user, per-account, or per-resource ordering while still allowing other queues to run concurrently.
Shutdown Signals
By default, the worker listens for OS shutdown signals such as SIGINT and SIGTERM. This is the right behavior for most containers and process managers: send a shutdown signal and give the process enough grace time to finish or release in-flight work.
Tune graceful shutdown with WorkerShutdownConfig or the convenience setters:
let worker = graphile_worker::WorkerOptions::default()
.shutdown_grace_period(std::time::Duration::from_secs(30))
.shutdown_interrupted_job_retry_delay(std::time::Duration::from_secs(30))
.define_job::<SendEmail>()
.database_url(&database_url)
.init()
.await?;
If your host application already owns signal handling, disable the built-in OS listeners and pass a future that resolves when shutdown should begin:
let worker = graphile_worker::WorkerOptions::default()
.listen_os_shutdown_signals(false)
.shutdown_signal(on_shutdown())
.define_job::<SendEmail>()
.database_url(&database_url)
.init()
.await?;
Graphile Worker still handles graceful draining after the shutdown signal is received.
Recovery
Worker recovery is opt-in. Enable it when a job may be left locked after a process crash, network partition, forced abort, or orchestrator timeout. When enabled, workers heartbeat in PostgreSQL and a sweeper recovers jobs owned by workers that have stopped heartbeating.
let worker = graphile_worker::WorkerOptions::default()
.heartbeat_interval(std::time::Duration::from_secs(30))
.sweep_interval(std::time::Duration::from_secs(60))
.sweep_threshold(std::time::Duration::from_secs(300))
.recovery_delay(std::time::Duration::from_secs(30))
.define_job::<SendEmail>()
.database_url(&database_url)
.init()
.await?;
Recovery sweeps use a PostgreSQL advisory lock so only one sweeper performs the
recovery work at a time. Recovered jobs are unlocked, their attempt count is
decremented back, their queue lock is released, and their run_at is delayed by
the configured recovery delay.
You can run the same kind of recovery manually from the CLI:
graphile-worker --database-url "$DATABASE_URL" sweep-stale-workers --dry-run
graphile-worker --database-url "$DATABASE_URL" sweep-stale-workers --sweep-threshold 5m --recovery-delay 30s
Use --dry-run first when investigating a production incident.
Cron
Cron entries are configured on the worker. Use the typed cron API when the task type is available in Rust:
use graphile_worker::{Cron, CrontabFill, WorkerOptions};
let worker = WorkerOptions::default()
.define_job::<SendDailyReport>()
.with_cron(
Cron::daily_at::<SendDailyReport>(8, 0)?
.fill(CrontabFill::hours(1)),
)
.database_url(&database_url)
.init()
.await?;
Crontab text is also supported:
let worker = graphile_worker::WorkerOptions::default()
.define_job::<SendDailyReport>()
.with_cron("0 8 * * * send_daily_report")?
.database_url(&database_url)
.init()
.await?;
For recurring jobs, decide which deployment owns each cron definition. Running the same cron configuration in multiple independent deployments can schedule the same recurring work more than intended unless the job key and cron settings are chosen to deduplicate the workload.
Local Queue
The local queue batch-fetches jobs from PostgreSQL and serves them from a local cache. This can reduce database round trips for high-throughput deployments, at the cost of keeping jobs in a process-local cache until they are claimed, returned, or the cache TTL expires.
use graphile_worker::{LocalQueueConfig, RefetchDelayConfig, WorkerOptions};
use std::time::Duration;
let worker = WorkerOptions::default()
.local_queue(
LocalQueueConfig::default()
.with_size(100)
.with_queue_count(2)
.with_ttl(Duration::from_secs(300))
.with_refetch_delay(
RefetchDelayConfig::default()
.with_duration(Duration::from_millis(100))
.with_threshold(10)
.with_max_abort_threshold(500),
),
)
.define_job::<SendEmail>()
.database_url(&database_url)
.init()
.await?;
size applies per local queue, so total local capacity is
size * queue_count. Workers configured with forbidden_flags bypass the local
queue and fetch jobs directly from the database. Benchmark realistic jobs before
turning this on for all production workers; the best settings depend on
PostgreSQL latency, pool size, worker concurrency, job duration, and the number
of replicas.
Observability
Graphile Worker emits useful events through lifecycle hooks and plugins. Use hooks for metrics, structured logs, tracing spans, validation, or custom recovery policy.
use graphile_worker::{
HookRegistry, JobComplete, JobFail, JobStart, Plugin, WorkerShutdown, WorkerStart,
};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug)]
struct MetricsPlugin {
jobs_started: AtomicU64,
jobs_completed: AtomicU64,
jobs_failed: AtomicU64,
}
impl Plugin for MetricsPlugin {
fn register(self, hooks: &mut HookRegistry) {
hooks.on(WorkerStart, async |ctx| {
tracing::info!(worker_id = %ctx.worker_id, "worker started");
});
let jobs_started = Arc::new(self.jobs_started);
let jobs_completed = Arc::new(self.jobs_completed);
let jobs_failed = Arc::new(self.jobs_failed);
{
let jobs_started = jobs_started.clone();
hooks.on(JobStart, move |_ctx| {
let jobs_started = jobs_started.clone();
async move {
jobs_started.fetch_add(1, Ordering::Relaxed);
}
});
}
{
let jobs_completed = jobs_completed.clone();
hooks.on(JobComplete, move |ctx| {
let jobs_completed = jobs_completed.clone();
async move {
jobs_completed.fetch_add(1, Ordering::Relaxed);
tracing::info!(duration_ms = ?ctx.duration, "job completed");
}
});
}
{
let jobs_failed = jobs_failed.clone();
hooks.on(JobFail, move |ctx| {
let jobs_failed = jobs_failed.clone();
async move {
jobs_failed.fetch_add(1, Ordering::Relaxed);
tracing::warn!(error = %ctx.error, will_retry = ctx.will_retry, "job failed");
}
});
}
hooks.on(WorkerShutdown, async |ctx| {
tracing::info!(worker_id = %ctx.worker_id, reason = ?ctx.reason, "worker shutting down");
});
}
}
Register plugins at startup:
let worker = graphile_worker::WorkerOptions::default()
.add_plugin(MetricsPlugin {
jobs_started: AtomicU64::new(0),
jobs_completed: AtomicU64::new(0),
jobs_failed: AtomicU64::new(0),
})
.define_job::<SendEmail>()
.database_url(&database_url)
.init()
.await?;
For operational inspection, the CLI can list jobs and inspect worker state:
graphile-worker --database-url "$DATABASE_URL" list --state ready
graphile-worker --database-url "$DATABASE_URL" stats
graphile-worker --database-url "$DATABASE_URL" queues
graphile-worker --database-url "$DATABASE_URL" workers
Deployment Checklist
- Provide
DATABASE_URLor a configured PostgreSQL pool. - Pick a schema and run migrations with
init()or the CLImigratecommand. - Set
concurrencyexplicitly for predictable behavior across instance sizes. - Size database pools for all replicas and job-handler database work.
- Ensure SIGTERM reaches the worker process and the orchestrator grace period is longer than the worker grace period.
- Enable recovery when forced exits or node failures are part of your failure model.
- Assign cron ownership deliberately.
- Turn on local queue only after measuring throughput and latency with realistic jobs.
- Add lifecycle hooks or plugins for logs, metrics, and failure visibility.
Observability
Graphile Worker RS exposes observability through standard Rust tracing, optional
OpenTelemetry compatibility features, and lifecycle hooks. The worker does not
force a logging or metrics backend on applications; install the subscriber,
exporter, and hook plugin that match your runtime.
Tracing
The main crate depends on tracing, so worker internals and application task
handlers can emit spans and events into the subscriber you install. The hooks
example uses tracing-subscriber with an environment filter and a formatted
output layer:
use tracing_subscriber::{
filter::EnvFilter, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt,
};
pub fn enable_logs() {
let fmt_layer = tracing_subscriber::fmt::layer();
let filter_layer = EnvFilter::try_new("debug,sqlx=warn").unwrap();
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.init();
}
Initialize the subscriber once, before starting the worker. The example keeps
SQLx noise at warn while enabling debug logging elsewhere.
OpenTelemetry compatibility
OpenTelemetry support is feature-gated. Enable exactly one compatibility feature:
[dependencies]
graphile_worker = {
version = "0.13",
features = ["opentelemetry_0_32"]
}
Available compatibility features are:
| Feature | OpenTelemetry crate version |
|---|---|
opentelemetry_0_30 | opentelemetry 0.30 |
opentelemetry_0_31 | opentelemetry 0.31 |
opentelemetry_0_32 | opentelemetry 0.32 |
The crate rejects builds that enable more than one of these features at the same time.
When an OpenTelemetry feature is enabled, job insertion code can capture the
current span context and write it into the job payload under _trace. Object
payloads receive the field directly. Array payloads receive it on each object
item in the array, while scalar values are left unchanged.
{
"user_id": 42,
"_trace": {
"flags": 1,
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
"span_id": "00f067aa0ba902b7"
}
}
When the worker later runs a job, it reads _trace from object payloads and
adds the recorded span context as a link to the current span. Invalid,
non-object, or missing trace payloads are ignored. Without an OpenTelemetry
feature, this trace extraction and linking path is a no-op.
Metrics with lifecycle hooks
Lifecycle hooks are the extension point for metrics and structured operational
events. A plugin registers callbacks on HookRegistry; those callbacks receive
context objects for worker and job events.
The metrics example counts started, completed, and failed jobs with atomics:
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use graphile_worker::{
HookRegistry, JobComplete, JobFail, JobStart, Plugin, WorkerShutdown, WorkerStart,
};
pub struct MetricsPlugin {
jobs_started: AtomicU64,
jobs_completed: AtomicU64,
jobs_failed: AtomicU64,
}
impl Plugin for MetricsPlugin {
fn register(self, hooks: &mut HookRegistry) {
hooks.on(WorkerStart, async |ctx| {
println!("[MetricsPlugin] Worker {} started", ctx.worker_id);
});
let jobs_started = Arc::new(self.jobs_started);
let jobs_completed = Arc::new(self.jobs_completed);
let jobs_failed = Arc::new(self.jobs_failed);
{
let jobs_started = jobs_started.clone();
hooks.on(JobStart, move |ctx| {
let jobs_started = jobs_started.clone();
async move {
jobs_started.fetch_add(1, Ordering::Relaxed);
println!(
"[MetricsPlugin] Job {} started (task: {})",
ctx.job.id(),
ctx.job.task_identifier()
);
}
});
}
{
let jobs_completed = jobs_completed.clone();
hooks.on(JobComplete, move |ctx| {
let jobs_completed = jobs_completed.clone();
async move {
jobs_completed.fetch_add(1, Ordering::Relaxed);
println!(
"[MetricsPlugin] Job {} completed in {:?}",
ctx.job.id(),
ctx.duration
);
}
});
}
{
let jobs_failed = jobs_failed.clone();
hooks.on(JobFail, move |ctx| {
let jobs_failed = jobs_failed.clone();
async move {
jobs_failed.fetch_add(1, Ordering::Relaxed);
println!(
"[MetricsPlugin] Job {} failed: {} (will_retry: {})",
ctx.job.id(),
ctx.error,
ctx.will_retry
);
}
});
}
hooks.on(WorkerShutdown, move |ctx| async move {
println!(
"[MetricsPlugin] Worker {} shutting down (reason: {:?})",
ctx.worker_id, ctx.reason
);
});
}
}
Use the same hook points to export counters, histograms, or structured events to your metrics backend:
| Hook | Useful signal |
|---|---|
WorkerStart | worker process started and has a worker id |
WorkerShutdown | worker process is stopping and exposes the shutdown reason |
JobStart | job id and task identifier started executing |
JobComplete | job completed and exposes execution duration |
JobFail | job failed, exposes the error and whether it will retry |
Logging
For application logs, combine a subscriber with hook callbacks and task-handler instrumentation. The hook contexts expose job ids, task identifiers, errors, durations, worker ids, and shutdown reasons, which are the stable values to put into operational logs.
For example, a JobFail hook can log the job id, error, and retry decision;
JobComplete can log duration; WorkerShutdown can log the shutdown reason.
The example code prints these values with println!, but production code will
usually emit tracing events or send them to a metrics/logging client.
Admin and monitor visibility
The workspace includes graphile_worker_admin_api,
graphile_worker_admin_ui, and graphile_worker_admin_ui_client crates. Use
those admin surfaces for queue visibility, and use hooks and tracing for
process-local signals that admin views cannot infer on their own, such as
per-process shutdown reasons, local counters, and span links.
For runtime monitoring, the most useful signals visible from the current public hooks are:
| Signal | Source |
|---|---|
| Worker started | WorkerStart hook |
| Worker shutdown reason | WorkerShutdown hook |
| Job throughput | JobStart and JobComplete hooks |
| Job failure rate | JobFail hook |
| Job duration | JobComplete hook |
| Trace continuity from enqueue to execution | _trace payload plus OpenTelemetry span links |
Reference
Use this section when you already know the concept you need and want exact package, feature, API, compatibility, or troubleshooting details.
The guide pages explain how to build common workflows. The reference pages are for looking up the supported surface area and checking which crate, feature flag, or documentation source applies to your use case.
docs.rs and this guide
The generated Rust API documentation is published on
docs.rs. Use docs.rs when you need item-level
details for public Rust types, traits, builders, and re-exports from the
graphile_worker crate.
Use this mdBook guide when you need operational or architectural context:
- Worker options explains how to assemble a worker configuration.
- Runtime, TLS, and drivers explains how feature selections affect database access.
- Migrations covers schema management.
- Observability covers tracing and runtime visibility.
- Troubleshooting collects common failure modes.
For example, look up the exact methods and trait bounds on docs.rs, then return to the guide for the surrounding setup:
use graphile_worker::{TaskHandler, WorkerOptions};
// Check docs.rs for the exact public API of each type.
// Check the guide for configuration examples and operational context.
Public crate surface
The top-level graphile_worker crate re-exports the main types used by
applications:
Workerfor running workers.WorkerOptions,CronInput, andWorkerBuildErrorfor configuration.WorkerUtilsfor job management helpers.TaskHandlerand related task handling types.- Job and job specification types from the job crates.
- Context, database, lifecycle hook, shutdown signal, and cron types.
LocalQueueand its configuration and error types.- Worker recovery types such as
WorkerRecoveryConfigandSweepStaleWorkersOptions.
The workspace also contains smaller crates for specific responsibilities. See Crate Map when you need to know which package owns a type or feature area.
Feature flags
The default graphile_worker feature set enables:
default = ["runtime-tokio", "tls-rustls", "driver-sqlx"]
The visible feature groups are:
- Runtime:
runtime-tokio,runtime-async-std. - TLS:
tls-rustls,tls-native-tls. - Database driver:
driver-sqlx,driver-tokio-postgres. - OpenTelemetry compatibility:
opentelemetry_0_30,opentelemetry_0_31,opentelemetry_0_32.
Start with the defaults unless you have a specific runtime, TLS, database driver, or OpenTelemetry version requirement. See Feature Flags and Runtime, TLS, and Drivers before changing them.
Compatibility checklist
Before changing a dependency or feature set, check the compatibility page for the constraints that matter across the workspace:
- Runtime feature enabled.
- Database driver feature enabled.
- TLS backend enabled.
- OpenTelemetry compatibility feature matching your tracing stack.
See Compatibility for the reference checklist.
Troubleshooting entry points
For setup or runtime issues, start with the page closest to the symptom:
- Worker configuration problems: Worker Options.
- Database schema or migration problems: Migrations.
- Shutdown behavior: Shutdown.
- Application state and extensions: Application State and Extensions.
- General reference issues: Troubleshooting.
When the issue is about a specific Rust type or trait, use the API Reference page as the bridge to docs.rs.
Feature Flags
The graphile_worker crate uses feature flags to select the async runtime, TLS
implementation, PostgreSQL driver, and optional OpenTelemetry compatibility
version.
Default features enable the most common setup:
[dependencies]
graphile_worker = "0.13"
This is equivalent to enabling:
[dependencies]
graphile_worker = {
version = "0.13",
features = ["runtime-tokio", "tls-rustls", "driver-sqlx"],
}
Defaults
The default feature set is:
runtime-tokiotls-rustlsdriver-sqlx
Use default-features = false when you want to choose a different runtime,
driver, or TLS backend.
[dependencies]
graphile_worker = {
version = "0.13",
default-features = false,
features = ["runtime-async-std", "tls-rustls", "driver-sqlx"],
}
Runtime Features
Runtime features select the async runtime used by the root crate and its internal crates.
| Feature | Purpose |
|---|---|
runtime-tokio | Enables Tokio runtime support. This is enabled by default. |
runtime-async-std | Enables async-std runtime support. |
The driver-tokio-postgres feature also enables runtime-tokio, so that driver
is a Tokio-only combination in the root crate feature graph.
TLS Features
TLS features select the TLS implementation forwarded to database-related crates and, when SQLx is enabled, to SQLx.
| Feature | Purpose |
|---|---|
tls-rustls | Enables rustls TLS support. This is enabled by default. |
tls-native-tls | Enables native-tls support. |
The root feature list does not force exactly one TLS backend. Pick the one you intend to use instead of enabling both accidentally.
Database Driver Features
Driver features select the PostgreSQL access layer used by database-related crates.
| Feature | Purpose |
|---|---|
driver-sqlx | Enables SQLx support and the optional sqlx dependency. This is enabled by default. |
driver-tokio-postgres | Enables tokio-postgres support and also enables runtime-tokio. |
SQLx is the default driver. The tokio-postgres driver is available as an alternative Tokio-based driver.
OpenTelemetry Features
OpenTelemetry support is versioned so downstream applications can choose the compatibility line they use.
| Feature | Dependencies enabled |
|---|---|
opentelemetry_0_30 | opentelemetry 0.30 and matching tracing-opentelemetry support |
opentelemetry_0_31 | opentelemetry 0.31 and matching tracing-opentelemetry support |
opentelemetry_0_32 | opentelemetry 0.32 and matching tracing-opentelemetry support |
These features are not enabled by default. Enable the one that matches the OpenTelemetry version used by the rest of your application.
[dependencies]
graphile_worker = {
version = "0.13",
features = ["opentelemetry_0_32"],
}
Common Combinations
Default Tokio, rustls, SQLx
Use the default dependency declaration when you want Tokio, rustls, and SQLx.
[dependencies]
graphile_worker = "0.13"
Tokio, native-tls, SQLx
Disable defaults and select the TLS backend explicitly.
[dependencies]
graphile_worker = {
version = "0.13",
default-features = false,
features = ["runtime-tokio", "tls-native-tls", "driver-sqlx"],
}
async-std, rustls, SQLx
Use the async-std runtime with the SQLx driver.
[dependencies]
graphile_worker = {
version = "0.13",
default-features = false,
features = ["runtime-async-std", "tls-rustls", "driver-sqlx"],
}
Tokio, tokio-postgres
The tokio-postgres driver enables runtime-tokio through the feature graph.
[dependencies]
graphile_worker = {
version = "0.13",
default-features = false,
features = ["driver-tokio-postgres"],
}
If your connection setup needs TLS, also select the TLS feature you intend to use:
[dependencies]
graphile_worker = {
version = "0.13",
default-features = false,
features = ["driver-tokio-postgres", "tls-rustls"],
}
Tokio, SQLx, OpenTelemetry 0.32
OpenTelemetry features can be added to the default feature set.
[dependencies]
graphile_worker = {
version = "0.13",
features = ["opentelemetry_0_32"],
}
Feature Matrix Used by Local Checks
The repository's runtime matrix checks the following combinations:
just test-docker-all-matrices
That command covers:
| Runtime | Driver | TLS |
|---|---|---|
runtime-tokio | driver-sqlx | tls-rustls |
runtime-async-std | driver-sqlx | tls-rustls |
runtime-tokio | driver-tokio-postgres | not selected by the test recipe |
The test-runtime recipe rejects driver-tokio-postgres with
runtime-async-std, matching the feature relationship where
driver-tokio-postgres enables Tokio.
Cautions
Disable defaults when replacing a default feature. For example, adding
tls-native-tls without default-features = false leaves tls-rustls enabled
too.
[dependencies]
graphile_worker = {
version = "0.13",
default-features = false,
features = ["runtime-tokio", "tls-native-tls", "driver-sqlx"],
}
Choose one database driver unless you have checked that your application really
needs both. The default driver is SQLx, so enabling driver-tokio-postgres
without disabling defaults enables both drivers.
[dependencies]
graphile_worker = {
version = "0.13",
default-features = false,
features = ["driver-tokio-postgres"],
}
Use an OpenTelemetry feature only when your dependency graph uses that matching OpenTelemetry line. The root crate exposes separate compatibility features for 0.30, 0.31, and 0.32.
Illustrative Rust examples in this documentation may require additional setup, such as task handlers, worker options, and a database URL. See the quick start for a complete first worker.
// Feature selection happens in Cargo.toml, not in Rust source.
use graphile_worker::WorkerOptions;
Crate Map
Graphile Worker RS is published as a main application crate plus smaller crates
that hold reusable pieces of the worker, cron, migration, admin, and database
layers. Most applications should depend on graphile_worker; the other crates
are useful when you need a narrower API surface, are building tooling around the
worker schema, or are working on Graphile Worker RS itself.
Main User-Facing Crate
| Crate | Use it for |
|---|---|
graphile_worker | Running workers, adding jobs, defining tasks, configuring cron, applying migrations, and using the public worker API from one dependency. |
The root crate re-exports the core types that applications normally need:
WorkerandWorkerOptionsfor worker setup.WorkerUtilsfor job management.TaskHandlerand related task handler types.JobSpecand job data types.Cron, crontab parser and crontab value types.- Lifecycle hook, context, database, and shutdown signal types.
- Dead worker recovery types such as
SweepStaleWorkersOptions.
For a normal worker binary, start here:
use graphile_worker::WorkerOptions;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let worker = WorkerOptions::default()
.database_url("postgres://postgres:postgres@localhost/postgres")
.define_job::<SendEmail>()
.init()
.await?;
worker.run().await?;
Ok(())
}
For job management without running a worker loop, use WorkerUtils from the
same crate:
use graphile_worker::{JobSpec, WorkerUtils};
async fn enqueue(utils: &WorkerUtils) -> graphile_worker::errors::Result<()> {
utils
.add_raw_job(
"send_email",
serde_json::json!({ "to": "user@example.com" }),
JobSpec::builder().queue_name("mailers").build(),
)
.await?;
Ok(())
}
User-Facing Building Blocks
These crates provide the types application code is most likely to see through
graphile_worker re-exports. Depend on them directly only when you are sharing
small interfaces across crates and do not want the full worker dependency.
| Crate | What it contains |
|---|---|
graphile_worker_task_handler | TaskHandler and reusable JobDefinition values for defining job handlers. |
graphile_worker_job_spec | Job options such as queue name, run time, max attempts, priority, and deduplication keys. |
graphile_worker_job | Job row/data types used by handlers, hooks, queries, and utilities. |
graphile_worker_ctx | Worker context passed through job execution. |
graphile_worker_extensions | Extension storage used to attach and retrieve shared application state. |
graphile_worker_lifecycle_hooks | Hook registry and lifecycle events around worker startup, job execution, completion, and cron ticks. |
graphile_worker_shutdown_signal | Cross-platform shutdown signal handling for graceful worker shutdown. |
For example, a helper crate that only defines task handlers can depend on the task handler and context crates instead of depending on the whole worker:
use graphile_worker_ctx::WorkerContext;
use graphile_worker_task_handler::{IntoTaskHandlerResult, TaskHandler};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SendEmail {
to: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
let _ = (self.to, ctx);
Ok::<(), String>(())
}
}
Cron Crates
Cron support is split into parser, shared types, and runner pieces.
| Crate | What it contains |
|---|---|
graphile_worker_crontab_types | Crontab value types and utilities, including timers, fields, fills, and job key mode types. |
graphile_worker_crontab_parser | Parser for crontab configuration. |
graphile_worker_crontab_runner | Runner that schedules crontab entries against the worker database and lifecycle hooks. |
Applications usually get these through graphile_worker, which re-exports
parse_crontab, Cron, CronBuilder, Crontab, CrontabTimer, and related
types.
Support Crates
These crates are useful for tools, integrations, and advanced applications that need to manage jobs or schema state without pulling every top-level worker API into their own public surface.
| Crate | What it contains |
|---|---|
graphile_worker_utils | Job management helpers used by WorkerUtils, including adding, removing, rescheduling, and other worker utility operations. |
graphile_worker_queries | Database query helpers for Graphile Worker job and worker operations. |
graphile_worker_recovery | Dead worker recovery helpers, including stale worker sweeping and recovery configuration. |
graphile_worker_migrations | Database migrations for the Graphile Worker schema. |
The root crate's documentation describes the managed schema as using the
graphile_worker schema by default, with private jobs, tasks, job queues, and
workers tables. These support crates are the lower-level pieces used to manage
that schema and operate on its rows.
Internal-ish Infrastructure Crates
These crates are part of the workspace API surface, but most applications should not need to depend on them directly.
| Crate | What it contains |
|---|---|
graphile_worker_database | Database driver abstraction used by worker, migrations, queries, recovery, cron, and utilities. It has feature support for sqlx and tokio-postgres drivers. |
graphile_worker_runtime | Async runtime compatibility helpers for Tokio and async-std feature combinations. |
graphile_worker_task_details | Shared mapping between task IDs and task identifiers. |
graphile_worker_migrations_core | Core migration types used by the migrations crate. |
graphile_worker_migrations_macros | Procedural macros used by the migrations crate. |
These crates exist to keep feature flags and dependencies focused. For example,
database-facing crates share the driver-sqlx, driver-tokio-postgres,
runtime-tokio, runtime-async-std, tls-rustls, and tls-native-tls feature
families instead of hard-coding one driver and runtime everywhere.
Admin And CLI Crates
The workspace also includes crates for operational tools around a Graphile Worker installation.
| Crate | What it contains |
|---|---|
graphile_worker_cli | The graphile-worker binary for migrations, job management, admin server startup, stats, queues, workers, cleanup, and stale worker sweeping. |
graphile_worker_admin_api | Shared admin API contracts and queries. |
graphile_worker_admin_ui | Embedded Leptos admin UI and server-side assets. |
graphile_worker_admin_ui_client | Leptos WASM client for the embedded admin UI. |
The CLI binary is installed as graphile-worker and connects with
--database-url or DATABASE_URL. Its README lists commands such as:
graphile-worker migrate
graphile-worker add send_email --payload '{"to":"user@example.com"}'
graphile-worker list --state ready
graphile-worker complete 123 124
graphile-worker fail 125 --reason "invalid payload"
graphile-worker reschedule 126 --run-at 2026-01-02T03:04:05Z
graphile-worker sweep-stale-workers --dry-run
Choosing A Dependency
Use graphile_worker unless you have a concrete reason not to. It is the stable
entry point for application code and re-exports the main worker, task, job,
cron, migration, hook, database, recovery, and utility types.
Use a smaller crate directly when:
- A library crate only needs to expose task handler types.
- A tool only needs job specs, job row types, or query helpers.
- An integration needs admin API contracts without embedding the UI.
- You are extending Graphile Worker RS itself and need the same internal boundaries as the workspace.
API Reference
This page is a map of the public Rust API. For complete signatures, trait bounds, and generated item documentation, use the docs.rs links in each section.
Most applications should start with the root crate:
graphile_worker- worker setup, task registration, job scheduling, cron helpers, lifecycle hooks, shutdown, recovery, and the most commonly used re-exports.
Worker Setup
Use WorkerOptions to configure and initialize a worker, then call Worker::run or Worker::run_once.
Important public types:
WorkerOptions- builder-style worker configuration.WorkerBuildError- initialization errors.Worker- initialized worker runtime.WorkerShutdownConfig- graceful shutdown behavior.ShutdownSignal- shareable shutdown future.
Common configuration methods include:
database_url,database, andpg_poolfor database connections.schema,concurrency,poll_interval,max_pg_conn, anduse_local_time.define_job,define_batch_job, anddefine_jobsfor task registration.add_forbidden_flagfor workers that skip jobs with specific flags.local_queue,complete_job_batch_delay, andfail_job_batch_delayfor throughput tuning.worker_recovery,heartbeat_interval,sweep_interval,sweep_threshold, andrecovery_delayfor dead worker recovery.listen_os_shutdown_signals,shutdown_signal,shutdown_grace_period, andshutdown_interrupted_job_retry_delayfor shutdown control.
use graphile_worker::{
IntoTaskHandlerResult, TaskHandler, WorkerContext, WorkerOptions,
};
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Deserialize, Serialize)]
struct SendEmail {
to: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
Ok::<(), String>(())
}
}
let worker = WorkerOptions::default()
.database_url("postgres://postgres:postgres@localhost/postgres")
.schema("graphile_worker")
.concurrency(8)
.poll_interval(Duration::from_millis(500))
.define_job::<SendEmail>()
.init()
.await?;
worker.run().await?;
Task Handlers
Task handler APIs live in graphile_worker_task_handler and are re-exported by graphile_worker.
Important public types:
TaskHandler- trait for a single job payload type.IntoTaskHandlerResult- conversion trait for task return values such as()andResult<(), E>.BatchTaskHandler- trait for JSON-array batch payloads.BatchTaskResultandIntoBatchTaskHandlerResult- batch completion and partial failure results.JobDefinition- reusable task registration value.TaskHandlerOutcomeandTaskHandlerFn- type-erased handler output and function type.run_task_from_worker_ctx- helper that deserializes and runs aTaskHandlerfromWorkerContext.
Batch handlers operate on Vec<Self> item payloads. BatchTaskResult::ItemResults
must return one result for each input item, in the same order; failed items are
retried as a replacement JSON-array payload.
use graphile_worker::{
BatchTaskHandler, IntoBatchTaskHandlerResult, WorkerContext, WorkerOptions,
};
use serde::{Deserialize, Serialize};
#[derive(Clone, Deserialize, Serialize)]
struct IndexRecord {
id: i64,
}
impl BatchTaskHandler for IndexRecord {
const IDENTIFIER: &'static str = "index_record";
async fn run_batch(
items: Vec<Self>,
_ctx: WorkerContext,
) -> impl IntoBatchTaskHandlerResult {
let results = items
.into_iter()
.map(|item| {
if item.id > 0 {
Ok(())
} else {
Err("invalid id")
}
})
.collect::<Vec<_>>();
results
}
}
let options = WorkerOptions::default().define_batch_job::<IndexRecord>();
Job Scheduling And Management
Use WorkerUtils to add, update, and maintain jobs without running a worker loop in the same code path.
Important public types:
WorkerUtils- client for scheduling and maintenance operations.JobSpecandJobSpecBuilder- optional job parameters.JobKeyMode- job-key behavior:Replace,PreserveRunAt, orUnsafeDedupe.RawJobSpec- raw batch scheduling input.Job,JobBuilder,DbJob, andDbJobData- job records returned by scheduling and management APIs.CleanupTask- cleanup operations fromgraphile_worker_utils::types.RescheduleJobOptions- optional fields for rescheduling jobs.
Common WorkerUtils methods include:
add_job,add_raw_job,add_jobs,add_raw_jobs, andadd_batch_job.remove_job,complete_jobs,permanently_fail_jobs, andreschedule_jobs.list_active_workers,sweep_stale_workers,sweep_stale_workers_with_config, andforce_unlock_workers.cleanupandmigrate.
use graphile_worker::{JobKeyMode, JobSpec, WorkerUtils};
use serde_json::json;
let spec = JobSpec::builder()
.queue_name("email")
.job_key("welcome-email:user-123")
.job_key_mode(JobKeyMode::PreserveRunAt)
.max_attempts(5)
.priority(-10)
.build();
let job = utils
.add_raw_job(
"send_email",
json!({ "to": "user@example.com" }),
spec,
)
.await?;
Worker Context And Extensions
Context APIs live in graphile_worker_ctx and are re-exported by graphile_worker.
Important public types:
WorkerContext- runtime context passed to task handlers.WorkerContextBuilder- builder for creating contexts.WorkerContextExt- root-crate extension helpers for worker context.TaskDetailsandSharedTaskDetails- task identifier mappings.ExtensionsandReadOnlyExtensionsfromgraphile_worker_extensions- typed application state storage.
Add shared state with WorkerOptions::add_extension. Task handlers receive a
WorkerContext and can use the context APIs exposed by the generated docs.
Cron
Cron support is exposed from the root crate and split across three crates:
graphile_worker_crontab_parser-parse_crontabandCrontabParseError.graphile_worker_crontab_types-Crontab,CrontabField,CrontabFill,CrontabTimer,CrontabTimerError,CrontabValue, and cronJobKeyMode.graphile_worker_crontab_runner-CronRunner,cron_main,Clock,MockClock,KnownCrontab, andScheduleCronJobError.
The root crate also re-exports:
CronandCronBuilderfor typed schedules.CronInputfor values accepted byWorkerOptions::with_cron.CronJobKeyMode, which is the crontabJobKeyModere-exported under a distinct root-crate name.
use graphile_worker::{Cron, CrontabFill, WorkerOptions};
let options = WorkerOptions::default()
.define_job::<SendEmail>()
.with_cron(
Cron::daily_at::<SendEmail>(8, 0)?
.fill(CrontabFill::hours(1)),
);
let options = WorkerOptions::default()
.with_cron("0 8 * * * send_email")?;
Database And Migrations
The database abstraction lives in graphile_worker_database.
Important public types:
Database,DatabaseDriver,DbTransaction, andTransactionDriver.DbExecutorandDbExecutorArg.DbError.Schema.NotificationandNotificationStream.DbCell,DbParams,DbRow,DbValue, andFromDbCell.escape_identifier.
Driver modules are feature gated:
graphile_worker_database::sqlxwith thedriver-sqlxfeature.graphile_worker_database::tokio_postgreswith thedriver-tokio-postgresfeature.
Migrations live in graphile_worker_migrations.
Important public items:
migrate- runs Graphile Worker schema migrations.MigrateError- migration error type.GraphileWorkerMigration- migration metadata type.pg_versionandsqlmodules.
The migration support crates are also public:
graphile_worker_migrations_core-GraphileWorkerMigration.graphile_worker_migrations_macros-include_migrations!.
Lifecycle Hooks
Lifecycle hook APIs live in graphile_worker_lifecycle_hooks and are re-exported by graphile_worker.
Important public types:
HookRegistry- registry for hook handlers.Plugin- plugin trait for registering hooks.Event,HookOutput, andInterceptable- hook event traits.TypeErasedHooks- erased hook registry.HookResultand event-specific result types such asJobScheduleResult.- Event marker and context types exported from the crate's
eventsandcontextmodules.
Workers can register individual handlers with WorkerOptions::on or register a
plugin with WorkerOptions::add_plugin.
use graphile_worker::{HookResult, JobStart, WorkerOptions};
let options = WorkerOptions::default()
.on(JobStart, |ctx| async move {
println!("starting job {}", ctx.job.id());
})
.on(graphile_worker::BeforeJobRun, |ctx| async move {
if ctx.payload.get("skip").and_then(|value| value.as_bool()) == Some(true) {
return HookResult::Skip;
}
HookResult::Continue
});
Recovery And Shutdown
Recovery APIs are exposed from graphile_worker_recovery and re-exported by the root crate where they are part of the worker API.
Important public types:
WorkerRecoveryConfig- dead worker recovery settings.SweepStaleWorkersOptionsandSweepStaleWorkersResult- manual sweep input and output.ResolvedSweepConfig- resolved sweep settings.ActiveWorkerRow- heartbeat row returned by worker listing APIs.INFRASTRUCTURE_RESILIENT_FLAGandjob_has_resilient_flag- infrastructure-resilient job flag support.
Shutdown signal support lives in graphile_worker_shutdown_signal:
ShutdownSignal- cloneable shared future.shutdown_signal- OS shutdown signal detector.
Runtime
graphile_worker_runtime provides
runtime-neutral async building blocks used by the worker internals and exposed as
a public crate.
Important public items:
channel,Receiver, andSender.Mutex,RwLock,RwLockReadGuard, andRwLockWriteGuard.NotifyandNotified.spawn,AbortHandle,JoinHandle, andJoinError.interval,sleep,sleep_until,timeout_at,Interval, andTimeoutError.
The crate requires either the runtime-tokio or runtime-async-std feature.
Admin Crates
Admin APIs are published as separate crates:
graphile_worker_admin_api- shared admin request and response DTO modules, plus SQLx read-query helpers behind itssqlxfeature.graphile_worker_admin_ui- native Axum admin server crate.graphile_worker_admin_ui_client- admin UI client crate, includingmanifest_dirand WASM-only client code.
Feature Flags
The root crate default feature set is:
default = ["runtime-tokio", "tls-rustls", "driver-sqlx"]
Runtime features:
runtime-tokioruntime-async-std
TLS features:
tls-rustlstls-native-tls
Database driver features:
driver-sqlxdriver-tokio-postgres
OpenTelemetry compatibility features:
opentelemetry_0_30opentelemetry_0_31opentelemetry_0_32
Lower-Level Crates
These crates are public for advanced integration and internal composition, but
most applications use them through graphile_worker re-exports:
graphile_worker_job-Job,JobBuilder,DbJob, andDbJobData.graphile_worker_job_spec-JobSpec,JobSpecBuilder, andJobKeyMode.graphile_worker_task_details-TaskDetailsandSharedTaskDetails.graphile_worker_queries- lower-level query modules such asadd_job,get_job,complete_job,fail_job,return_jobs,recover_workers,task_identifiers, andworker_heartbeat.graphile_worker_utils-WorkerUtils,client, andtypes.
Compatibility
Graphile Worker RS is a Rust implementation of the PostgreSQL-backed job queue used by Node Graphile Worker. It is designed to use the same database schema and can run side by side with Node workers when both runtimes understand the schema revision installed in PostgreSQL.
This page focuses on database and operational compatibility. Rust task handler code is not portable to Node.js, and Node task functions are not portable to Rust; the compatibility boundary is the jobs stored in PostgreSQL.
What Is Shared
Graphile Worker RS manages a PostgreSQL schema, graphile_worker by default.
The schema stores queued jobs, task identifiers, queue locks, migration state,
and optional worker recovery state.
The current migrations install the same public-facing objects in the
graphile_worker schema that applications typically interact with:
add_job(...)schedules one job.add_jobs(...)schedules many jobs.remove_job(...)removes a keyed job.complete_jobs(...),reschedule_jobs(...),permanently_fail_jobs(...), andforce_unlock_workers(...)administer jobs.jobsis a compatibility view over the private job tables.migrationsis the migration ledger.
Internally, newer migrations store queue data in private tables such as
_private_jobs, _private_tasks, and _private_job_queues. Application code
should prefer the public SQL functions and the jobs view instead of depending
on the private tables directly.
Coexisting With Node Workers
Rust and Node workers can process jobs from the same schema when they register handlers for the same task identifiers and agree on the payload shape.
For example, a job scheduled through SQL can be picked up by any compatible
worker that has a handler for send_email:
select graphile_worker.add_job(
identifier => 'send_email',
payload => '{"to":"ada@example.com","subject":"Welcome"}'::json,
queue_name => 'mailers',
run_at => now(),
max_attempts => 25,
job_key => 'welcome:ada@example.com',
priority => 0,
flags => null,
job_key_mode => 'replace'
);
The matching Rust handler must use the same task identifier:
use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
struct SendEmail {
to: String,
subject: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
Ok::<(), String>(())
}
}
The job payload is stored as JSON. Keep payloads language-neutral: use stable field names, avoid Rust-only serialization assumptions, and make nullable or optional fields explicit when both Rust and Node workers may consume the same task.
Schema Selection
The default schema is graphile_worker, but Graphile Worker RS can be pointed at
another schema during worker setup:
let worker = graphile_worker::WorkerOptions::default()
.schema("graphile_worker")
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
Use the same schema name when you want Rust and Node workers to share a queue. Use separate schemas when you want isolation, for example during a staged migration or when two applications have unrelated task identifiers.
Important Differences
Graphile Worker RS is mostly compatible with Node Graphile Worker, but it is not the same process model or API surface.
The README documents one runtime-level difference: Node Graphile Worker gives
each process its own worker_id; Graphile Worker RS uses one worker id and
processes jobs within the async runtime according to the configured concurrency.
The Rust crate exposes Rust-specific APIs such as WorkerOptions,
TaskHandler, WorkerUtils, runtime feature selection, typed job definitions,
and optional recovery configuration. Those are Rust conveniences around the
shared database model, not Node APIs.
Graphile Worker RS also adds recovery support in the current schema through
_private_workers and SQL functions such as worker_heartbeat,
worker_deregister, list_stale_workers, list_orphan_locked_workers,
recover_dead_worker_jobs, and delete_stale_workers. Recovery is opt-in in
the Rust worker configuration. If you run mixed Rust and Node workers, enable
and test recovery behavior deliberately so that every worker process in the
shared schema is compatible with your operational expectations.
Migration Behavior
Graphile Worker RS runs ordered migrations and records them in
graphile_worker.migrations. The migration set is idempotent: tests install a
fresh schema, add a job, rerun migrations multiple times, and verify that the job
remains present.
The migration code can also take over an existing schema that already has a
Graphile Worker-style migrations table. Tests cover a pre-existing migrations
table containing migration 1, then run the Rust migrations, add a job, and
rerun migrations without losing that job.
There are two compatibility checks to plan for:
- If the database contains a breaking migration id newer than this Rust crate knows about, migration aborts instead of downgrading or guessing. Newer non-breaking migration ids log a warning and continue.
- Migration 11 refuses to run while recent jobs are locked. Tests set
locked_atandlocked_byon an existing job and expect the migration to fail with a locked-job error.
For production coexistence, treat schema upgrades as a coordinated operation:
- Check that every Rust and Node worker version you plan to run supports the target database schema revision.
- Drain or stop workers before migrations that rewrite job tables or private structures.
- Confirm there are no active locks before crossing migration 11.
- Start one runtime first, schedule a simple job, and verify the other runtime can see or process jobs with the same task identifier before moving real traffic.
Migration Strategies
For gradual migration from Node to Rust, prefer one of these approaches:
- Shared schema: keep Node workers running, add Rust workers with handlers for a small set of task identifiers, and move task ownership incrementally.
- Separate schema: run Rust workers in a different schema while validating new task implementations, then move scheduling code to the shared schema when the payload and retry behavior are proven.
- SQL scheduling boundary: schedule jobs through
graphile_worker.add_joboradd_jobsfrom application code so the producer does not depend on either runtime's client API.
When sharing a schema, avoid registering different behavior for the same task identifier in both runtimes unless the handlers are intentionally equivalent. PostgreSQL stores the task identifier, not the language runtime that should own the job.
Related pages:
Troubleshooting
This page collects common setup, runtime, and documentation issues with concrete checks you can run before digging into application-specific code.
Database Connection Problems
Graphile Worker RS needs a PostgreSQL connection. Most examples use a SQLx pool
and most local checks use DATABASE_URL.
Check that the URL is set and points at PostgreSQL:
echo "$DATABASE_URL"
For a local throwaway database that matches the test setup, the repository uses
PostgreSQL on port 54233:
docker run -d --name graphile-worker-rs-test \
-p 54233:5432 \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_USER=postgres \
-e POSTGRES_DB=postgres \
postgres
Then use:
export DATABASE_URL='postgres://postgres:postgres@localhost:54233/postgres'
If a command cannot connect, verify that the container is ready:
docker exec graphile-worker-rs-test pg_isready -U postgres -h localhost
Migrations Have Not Run
If jobs are not visible, worker startup fails, or CLI commands fail against an empty database, run the Graphile Worker migrations for the target database.
Using the CLI:
graphile-worker --database-url postgres://postgres:postgres@localhost/postgres migrate
Or with DATABASE_URL:
DATABASE_URL=postgres://postgres:postgres@localhost/postgres graphile-worker migrate
By default, the CLI uses the graphile_worker schema. See
Migrations and CLI for
more operational detail.
Runtime Or Feature Mismatches
The default crate features are Tokio, rustls TLS, and the SQLx driver:
graphile_worker = { version = "0.13" }
If you disable default features, choose one runtime and one supported driver. For async-std with SQLx, include the async-std runtime feature and a TLS feature:
graphile_worker = { version = "0.13", default-features = false, features = ["runtime-async-std", "driver-sqlx", "tls-rustls"] }
async-std = { version = "1", features = ["attributes"] }
For Tokio with the tokio-postgres driver:
graphile_worker = { version = "0.13", default-features = false, features = ["runtime-tokio", "driver-tokio-postgres"] }
The repository test matrix only runs driver-tokio-postgres with
runtime-tokio. If you see feature errors, compare your feature list with
Runtime and Drivers and
Features.
Worker Starts But A Job Does Not Run
First check that the task identifier used when adding the job exactly matches the registered handler:
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
Ok::<(), String>(())
}
}
The worker must register that handler before init():
let worker = WorkerOptions::default()
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
Then add jobs with the same identifier or with the type-safe helper:
worker
.create_utils()
.add_raw_job(
"send_email",
serde_json::json!({ "to": "user@example.com" }),
Default::default(),
)
.await?;
If jobs are scheduled for the future, they will not run until their run_at
time. If jobs share a queue name, jobs in that queue run in series rather than
in parallel. See Tasks, Scheduling,
and Queues.
Payload Deserialization Failures
Task payloads are deserialized into the task struct. If the payload shape does not match the struct, the handler cannot receive the data you expect.
Check the struct fields and the JSON payload side by side:
#[derive(Deserialize, Serialize)]
struct SendEmail {
to: String,
subject: String,
body: String,
}
SELECT graphile_worker.add_job(
'send_email',
json_build_object(
'to', 'user@example.com',
'subject', 'Welcome',
'body', 'Thanks for signing up.'
)
);
When adding jobs from Rust, prefer the type-safe add_job path when the task
type is available:
utils.add_job(
SendEmail {
to: "user@example.com".to_string(),
subject: "Welcome".to_string(),
body: "Thanks for signing up.".to_string(),
},
Default::default(),
).await?;
Shutdown Signal Conflicts
By default, Graphile Worker installs OS-level shutdown listeners such as
SIGINT and SIGTERM so it can drain gracefully. If your application already
owns shutdown handling, disable the built-in listeners and pass your own
shutdown future:
let shutdown = WorkerShutdownConfig::default()
.listen_os_shutdown_signals(false)
.shutdown_signal(on_shutdown());
let worker = WorkerOptions::default()
.worker_shutdown(shutdown)
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
See Shutdown for the configuration surface.
Locked Jobs After A Crash
Worker recovery is disabled by default. For deployments where jobs may remain locked after a process crash, network partition, forced abort, or orchestrator shutdown, enable recovery:
let recovery = WorkerRecoveryConfig::default()
.enabled(true)
.heartbeat_interval(Duration::from_secs(30))
.sweep_interval(Duration::from_secs(60))
.sweep_threshold(Duration::from_secs(300))
.recovery_delay(Duration::from_secs(30));
let worker = WorkerOptions::default()
.worker_recovery(recovery)
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
The CLI can also inspect and recover stale workers:
graphile-worker sweep-stale-workers --dry-run
graphile-worker sweep-stale-workers --sweep-threshold 5m --recovery-delay 30s
Use --dry-run first when you want to see which workers would be recovered
without unlocking or rescheduling jobs. See Recovery.
CLI Checks
The installed binary is graphile-worker. It connects with --database-url or
DATABASE_URL.
Useful checks:
graphile-worker list --state ready
graphile-worker show 123
graphile-worker stats
graphile-worker queues
graphile-worker workers
Useful repair commands:
graphile-worker fail 125 --reason "invalid payload"
graphile-worker reschedule 126 --run-at 2026-01-02T03:04:05Z
graphile-worker force-unlock graphile_worker_deadbeef
graphile-worker cleanup
Use the same database URL and schema that your worker uses, otherwise the CLI will inspect a different queue.
Local Test Environment Checks
The repository has two common test paths:
just test
This runs cargo test --all and requires DATABASE_URL to point at a usable
PostgreSQL database.
just test-docker
This starts a postgres container named graphile-worker-rs-test, waits for
pg_isready, runs tests with
DATABASE_URL='postgres://postgres:postgres@localhost:54233/postgres', and then
removes the container.
If Docker reports that the container name or port is already in use, remove the
old test container or stop the process using port 54233:
docker rm -f graphile-worker-rs-test
Runtime-specific test helpers also exist:
just test-docker-runtime runtime-tokio driver-sqlx
just test-docker-runtime runtime-async-std driver-sqlx
just test-docker-runtime runtime-tokio driver-tokio-postgres
Documentation Build Checks
Documentation commands are defined in the repository justfile:
just docs-install
just docs
just docs-serve
just docs-install installs mdbook with version ^0.4. just docs runs
mdbook build docs. just docs-serve serves the book on 127.0.0.1:3000 by
default.