Architecture
Graphile Worker RS is a PostgreSQL-backed job queue. Application code writes jobs into the Graphile Worker schema, worker processes lock ready jobs with PostgreSQL row locks, task handlers run in the configured async runtime, and the worker persists completion, retry, failure, or recovery state back to the database.
The public crate is split into a small set of user-facing concepts:
WorkerOptionsconfigures a worker, registers task handlers, and creates aWorker.Workerruns the job loop until shutdown.WorkerUtilsadds and manages jobs outside the worker loop.TaskHandlerdefines the Rust code that handles a job payload.JobSpecconfigures scheduling, queues, attempts, priority, flags, and job keys for inserted jobs.
Internally, the root crate coordinates smaller workspace crates. The query crate owns SQL construction, the job/task/spec crates define shared data structures, the lifecycle hook crate exposes extension points, and the runtime facade keeps the worker logic independent of a specific async runtime feature.
Data Flow
The normal path from enqueue to release is:
- Application code adds a job through
WorkerUtilsor the lower-leveladd_jobSQL wrapper. - PostgreSQL stores the job in the worker schema and emits the database notification used by listeners.
- A running worker receives a signal from
LISTEN jobs:insert, periodic polling,run_once, or an internal local-queue signal. - Worker tasks fetch ready jobs with
for update skip locked, filtered to the task identifiers registered in this worker. - The worker builds a
WorkerContext, runs the matchingTaskHandler, and catches task errors and panics. - The release path deletes completed jobs, reschedules retryable failures, or returns interrupted jobs for recovery.
application
|
| add_job(identifier, payload, JobSpec)
v
PostgreSQL worker schema
|
| LISTEN/NOTIFY or polling
v
Worker job loop
|
| get_job / batch_get_jobs
v
TaskHandler::run(WorkerContext)
|
+-- success ----------> complete_job: delete job, unlock queue
|
+-- task failure -----> fail_job: save error, unlock, retry later
|
+-- shutdown abort ---> recovery return: decrement attempt, unlock, delay
Adding Jobs
The query layer inserts jobs by calling the schema's add_job database
function with the task identifier, JSON payload, and JobSpec fields:
let job = worker
.create_utils()
.add_raw_job(
"send_email",
serde_json::json!({ "to": "a@example.com" }),
graphile_worker::JobSpec::default(),
)
.await?;
The SQL wrapper passes these options to PostgreSQL:
identifier: the task name registered by a handler.payload: the JSON payload stored with the job.queue_name: optional queue serialization key.run_at: optional scheduled time.max_attempts: optional retry limit.job_keyandjob_key_mode: optional job de-duplication/update behavior.priority: lower values are fetched first.flags: labels that workers may skip throughforbidden_flags.
When tracing context is active, the insert path can add trace information into
the payload before writing it. If use_local_time is enabled and no run_at
is supplied, the Rust process time is used as the job's run time; otherwise the
SQL layer uses database time.
Fetching Jobs
Workers do not scan every job type. During initialization they know the task
identifiers they can run, and fetch only jobs whose task_id is in that set.
The fetch query requires:
is_available = truerun_at <= now(), or the supplied local timestamp- no forbidden flags selected by this worker
- an available queue when the job belongs to a named queue
Jobs are ordered by priority asc, run_at asc, then locked with:
for update
skip locked
After selecting a row, the same query increments attempts, sets locked_by
to the worker id, and records locked_at. For queued jobs, the related
job_queues row is also locked by the worker. This is the core coordination
mechanism that lets multiple worker processes share the same PostgreSQL queue
without taking the same job.
Worker Loop
Worker::run() performs the runtime lifecycle in this order:
- Register the worker when recovery support is enabled.
- Emit worker start hooks.
- Spawn recovery background tasks.
- Run the crontab scheduler and job runner together.
- Wait for completion and failure batchers to flush on shutdown.
- Stop recovery tasks, emit shutdown hooks, and deregister the worker.
The job runner listens for job signals. A signal can come from the PostgreSQL
listener, the polling interval, run_once, or the local queue. Each signal is
fanned out to the configured concurrency so idle worker tasks can attempt to
fetch jobs. Signals are wakeup hints rather than durable work items, so the
dispatcher coalesces them when worker tasks are already saturated. This keeps
PostgreSQL notification listeners drained while polling still provides the
fallback path for missed or future work.
There are two fetch modes:
- Direct mode fetches one job per worker task with
get_job. - Local queue mode prefetches jobs with
batch_get_jobsand wakes worker tasks through an internal signal when cached jobs are available.
Both modes end in the same execution and release logic.
Running a Task
After a job is fetched, the worker looks up the registered handler by the job's task identifier. If no identifier or function is found, the job is released through the failure path.
For a valid job, the worker creates a WorkerContext containing the shared
job, database handle, schema, worker id, extensions, task details, and time
mode. It then runs the handler future:
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
// use ctx and the deserialized payload here
Ok::<(), String>(())
}
}
The execution path records task duration for successful jobs, catches panics, and watches the worker shutdown signal. If shutdown is requested, the worker waits for the configured grace period. A task that has not finished by then is treated as aborted and goes through the recovery release path.
Completion and Failure
Successful jobs are completed by deleting the job row. If the job belongs to a
queue, completion also clears locked_by and locked_at on the queue row, but
only when that queue is locked by the same worker id.
Failed jobs are updated instead of deleted:
last_erroris set to the persisted error message.run_atis moved forward by exponential backoff based onattempts.locked_byandlocked_atare cleared.- a replacement payload may be written when the handler returned one.
- queued jobs also unlock their queue row.
The worker decides whether a failure will retry by comparing attempts with
max_attempts. Retryable failures emit job-fail hooks; exhausted jobs emit
permanent-failure hooks. The SQL update is the same shape: a permanently failed
job remains unlocked with its last error recorded and is no longer available
once its attempt limit is reached by the schema availability rules.
Completion and ordinary failures can be batched. The completion and failure batchers receive release requests through bounded channels, flush them after the configured delay, and emit hooks only for rows that were actually persisted. If a batcher is already closed, the worker falls back to direct persistence for that job.
Shutdown and Recovery
Shutdown aborts are intentionally not normal task failures. When a running job is interrupted by the shutdown grace-period timeout, the worker applies job recovery instead of failure backoff:
attemptsis decremented with a floor of zero.locked_byandlocked_atare cleared.run_atis delayed byinterrupted_job_retry_delaywhen configured.- queued jobs also unlock their queue row.
- interruption hooks are emitted when recovery handled the job.
Automatic worker recovery is separate and opt-in. When enabled, workers record heartbeats, and the recovery sweeper finds stale worker ids or orphaned locks. Recovered jobs are returned to the queue through the same recovery semantics: the attempt consumed by fetching the job is reversed, locks are cleared, and a recovery delay can be applied before the job is visible again.
The recovery SQL layer can also fetch locked jobs for a set of worker ids and
call the schema's recover_dead_worker_jobs function to recover jobs owned by
dead workers.
Workspace Layout
The workspace keeps behavior separated by responsibility:
src/
lib.rs crate exports and top-level module structure
runner/ worker lifecycle, job loop, execution, release
batcher/ completion and failure batching
streams/ job signal and job fetch streams
crates/
graphile-worker-queries/
src/add_job/ SQL wrappers for inserting jobs
src/get_job.rs single-job locking fetch
src/batch_get_jobs.rs
batched locking fetch for LocalQueue
src/complete_job.rs completion persistence
src/fail_job/ failure persistence
src/return_jobs/ recovery and interrupted-job return
src/recover_workers.rs
stale-worker recovery queries
This layout keeps most PostgreSQL statements in
graphile-worker-queries, while src/runner owns the orchestration decisions:
when to fetch, how to execute handlers, which release path to use, and when to
flush background components.