Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Worker Recovery

Worker recovery is an opt-in safety mechanism for jobs that remain locked after a worker stops unexpectedly. It is useful when a process crashes, is aborted, loses database connectivity, or is terminated by an orchestrator before it can release its in-flight jobs.

When recovery is enabled, workers write heartbeat rows to PostgreSQL. A sweeper then looks for workers whose heartbeat is stale and returns their locked jobs to the queue.

Enabling Recovery

Recovery is disabled by default. You can enable it with WorkerRecoveryConfig:

use graphile_worker::{WorkerOptions, WorkerRecoveryConfig};
use std::time::Duration;

let recovery = WorkerRecoveryConfig::default()
    .enabled(true)
    .heartbeat_interval(Duration::from_secs(30))
    .sweep_interval(Duration::from_secs(60))
    .sweep_threshold(Duration::from_secs(300))
    .recovery_delay(Duration::from_secs(30));

let worker = WorkerOptions::default()
    .worker_recovery(recovery)
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

The convenience setters on WorkerOptions also enable recovery:

use graphile_worker::WorkerOptions;
use std::time::Duration;

let worker = WorkerOptions::default()
    .heartbeat_interval(Duration::from_secs(30))
    .sweep_interval(Duration::from_secs(60))
    .sweep_threshold(Duration::from_secs(300))
    .recovery_delay(Duration::from_secs(30))
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

Configuration

WorkerRecoveryConfig::default() uses these values:

OptionDefaultMeaning
enabledfalseWhether heartbeat registration and sweeping are active.
heartbeat_interval30 secondsHow often an enabled worker updates its heartbeat row.
sweep_interval60 secondsHow often the background sweeper checks for inactive workers.
sweep_threshold5 minutesHow old a heartbeat must be before the worker is considered inactive.
recovery_delay30 secondsDelay before recovered jobs are eligible to run again.
resilient_sweep_threshold_multiplier3Multiplier applied to sweep_threshold for workers holding resilient jobs.
resilient_job_flags["infrastructure_resilient"]Job flags that activate the extended threshold.

Each builder method on WorkerRecoveryConfig stores the new value and enables recovery, except .enabled(false), which explicitly disables it again.

Heartbeats

An enabled worker registers itself in the private workers table and refreshes last_heartbeat_at at the configured heartbeat_interval.

You can inspect registered workers with WorkerUtils::list_active_workers. The stale state is calculated from the threshold you pass to that method:

use std::time::Duration;

let workers = utils
    .list_active_workers(Duration::from_secs(60))
    .await?;

for worker in workers {
    println!(
        "worker={} stale={} started_at={} last_heartbeat_at={}",
        worker.worker_id,
        worker.is_stale,
        worker.started_at,
        worker.last_heartbeat_at
    );
}

Sweeping Stale Workers

The background sweeper runs at sweep_interval. During a sweep it:

  • takes a transaction-scoped PostgreSQL advisory lock, so concurrent sweepers do not recover the same jobs twice;
  • finds workers whose heartbeat is older than sweep_threshold;
  • also finds orphan job locks whose locked_by worker is no longer registered;
  • unlocks the jobs held by those workers;
  • decrements the recovered jobs' attempt count back down;
  • releases queue locks for queued jobs;
  • moves run_at forward by recovery_delay;
  • records Job recovered after worker interruption as the recovered job error.

If another worker already holds the sweep lock, the sweep exits without recovering jobs.

Manual Sweeps

Use WorkerUtils::sweep_stale_workers when you want an operator command, admin endpoint, or scheduled task to run recovery directly.

use graphile_worker::{SweepStaleWorkersOptions, WorkerUtils};
use std::time::Duration;

let utils = WorkerUtils::new(database, "graphile_worker".to_string());

let result = utils
    .sweep_stale_workers(SweepStaleWorkersOptions {
        sweep_threshold: Some(Duration::from_secs(60)),
        recovery_delay: Some(Duration::from_secs(30)),
        dry_run: false,
    })
    .await?;

println!(
    "recovered {} jobs from {:?}",
    result.recovered_count,
    result.worker_ids
);

SweepStaleWorkersOptions can override sweep_threshold and recovery_delay for a single sweep. Leave either field as None to use the recovery config defaults. Set dry_run: true to return the worker IDs that would be considered dead without recovering jobs or deleting stale worker rows.

When you need custom resilient settings for a manual sweep, pass a WorkerRecoveryConfig explicitly:

use graphile_worker::{SweepStaleWorkersOptions, WorkerRecoveryConfig};
use std::time::Duration;

let config = WorkerRecoveryConfig::default()
    .sweep_threshold(Duration::from_secs(60))
    .resilient_sweep_threshold_multiplier(3);

let result = utils
    .sweep_stale_workers_with_config(&config, SweepStaleWorkersOptions {
        recovery_delay: Some(Duration::from_secs(30)),
        dry_run: false,
        ..Default::default()
    })
    .await?;

Resilient Jobs

Some jobs are expected to run for a long time. Recovery supports a resilient flag so those jobs are not reclaimed as quickly as ordinary work.

By default, a job is resilient when its flags include "infrastructure_resilient": true. Workers holding resilient jobs use:

effective threshold = sweep_threshold * resilient_sweep_threshold_multiplier

The default multiplier is 3.

use graphile_worker::{JobSpec, INFRASTRUCTURE_RESILIENT_FLAG};

let job = utils
    .add_job(
        LongRunningJob { id: 1 },
        JobSpec::builder()
            .flags(vec![INFRASTRUCTURE_RESILIENT_FLAG.to_string()])
            .build(),
    )
    .await?;

You can configure a different flag list:

use graphile_worker::WorkerRecoveryConfig;

let recovery = WorkerRecoveryConfig::default()
    .resilient_job_flags(vec!["custom_resilient".to_string()]);

A configured flag must be present and truthy on the job for the extended threshold to apply.

Recovery Hooks

Recovery hooks let your application decide what to do with each job recovered from a crashed worker. The hook receives the job, the recovering worker ID, the previous worker ID, and the interruption reason.

use graphile_worker::{
    FailureReason, HookRegistry, JobRecovery, JobRecoveryResult, SweepStaleWorkersOptions,
    WorkerUtils,
};
use std::sync::Arc;

let mut hooks = HookRegistry::new();

hooks.on(JobRecovery, |ctx| async move {
    if ctx.reason == FailureReason::WorkerCrashed {
        JobRecoveryResult::Default
    } else {
        JobRecoveryResult::Skip
    }
});

let utils = WorkerUtils::new(database, "graphile_worker".to_string())
    .with_hooks(Arc::new(hooks));

let result = utils
    .sweep_stale_workers(SweepStaleWorkersOptions::default())
    .await?;

The hook result controls recovery for that job:

ResultEffect
JobRecoveryResult::DefaultUnlock the job, decrement attempts, and delay it by recovery_delay.
JobRecoveryResult::Reschedule { run_at, attempts }Unlock the job, set run_at, and optionally replace attempts.
JobRecoveryResult::FailWithBackoffFail the job with the normal retry backoff and WorkerCrashed as the error.
JobRecoveryResult::SkipLeave the job locked and do not count it as recovered.

After a hook handles a job with any result except Skip, the normal job interrupted hook is emitted with FailureReason::WorkerCrashed. After a sweep finishes, worker recovery hooks are emitted with the recovered worker IDs and the recovered job count.