Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Lifecycle Hooks

Lifecycle hooks let you observe worker activity or intercept specific points in the job lifecycle. They are registered through plugins and run inside the worker process that owns the hook registry.

Use hooks for logging, metrics, validation, schedule-time payload adjustment, and local queue diagnostics. Keep hook handlers fast and focused: they run on the worker path they observe or intercept.

Registering Hooks

A plugin implements Plugin and adds handlers to the HookRegistry:

use graphile_worker::{HookRegistry, JobComplete, Plugin, WorkerStart};

struct LoggingPlugin;

impl Plugin for LoggingPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        hooks.on(WorkerStart, async |ctx| {
            println!("worker {} started", ctx.worker_id);
        });

        hooks.on(JobComplete, async |ctx| {
            println!("job {} completed in {:?}", ctx.job.id(), ctx.duration);
        });
    }
}

Add plugins when building the worker:

let worker = graphile_worker::WorkerOptions::default()
    .define_job::<ProcessData>()
    .pg_pool(pg_pool)
    .add_plugin(LoggingPlugin)
    .init()
    .await?;

Multiple plugins can be added with repeated .add_plugin(...) calls.

Observer vs Interceptor Hooks

Hooks are split into two categories.

Observer hooks return () and are for side effects. They are useful for metrics, tracing, logging, and state snapshots. When an observer event is emitted, every registered handler for that event is called.

Interceptor hooks return a result that can affect worker behavior. The common result type is HookResult:

use graphile_worker::HookResult;

HookResult::Continue
HookResult::Skip
HookResult::Fail("reason".to_string())

Continue allows the operation to proceed. Skip and Fail(...) are terminal results and stop the interceptor chain.

BeforeJobSchedule uses JobScheduleResult instead. It can continue with a payload, skip scheduling, or fail scheduling:

use graphile_worker::JobScheduleResult;

JobScheduleResult::Continue(payload)
JobScheduleResult::Skip
JobScheduleResult::Fail("reason".to_string())

Schedule interceptors are chained through the payload. If one plugin returns JobScheduleResult::Continue(new_payload), the next schedule interceptor receives that transformed payload. A terminal result stops later schedule interceptors.

Worker Hooks

Worker hooks observe the worker process itself.

HookContext highlightsType
WorkerInitdatabase, schema, concurrencyobserver
WorkerStartdatabase, worker_id, extensionsobserver
WorkerShutdowndatabase, worker_id, reasonobserver
WorkerRecoveredworker_id, dead_worker_ids, jobs_recoveredobserver

Example:

use graphile_worker::{HookRegistry, Plugin, WorkerShutdown, WorkerStart};

struct WorkerLogPlugin;

impl Plugin for WorkerLogPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        hooks.on(WorkerStart, async |ctx| {
            println!("worker {} started", ctx.worker_id);
        });

        hooks.on(WorkerShutdown, async |ctx| {
            println!("worker {} stopped: {:?}", ctx.worker_id, ctx.reason);
        });
    }
}

Job Run Hooks

Run hooks observe or intercept jobs as they are fetched and executed.

HookContext highlightsType
JobFetchjob, worker_idobserver
BeforeJobRunjob, worker_id, payloadinterceptor
JobStartjob, worker_idobserver
AfterJobRunjob, worker_id, result, durationinterceptor
JobCompletejob, worker_id, durationobserver
JobFailjob, worker_id, error, will_retryobserver
JobPermanentlyFailjob, worker_id, errorobserver
JobInterruptedjob, worker_id, reasonobserver

BeforeJobRun can prevent a task handler from running:

use graphile_worker::{BeforeJobRun, HookRegistry, HookResult, Plugin};

struct ValidationPlugin;

impl Plugin for ValidationPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        hooks.on(BeforeJobRun, async |ctx| {
            let should_skip = ctx
                .payload
                .get("skip")
                .and_then(|value| value.as_bool())
                .unwrap_or(false);

            if should_skip {
                return HookResult::Skip;
            }

            let should_fail = ctx
                .payload
                .get("force_fail")
                .and_then(|value| value.as_bool())
                .unwrap_or(false);

            if should_fail {
                return HookResult::Fail("forced failure by validation hook".into());
            }

            HookResult::Continue
        });
    }
}

When a BeforeJobRun hook skips a job, the task handler is not called. When it fails a job, the task handler is also not called and the normal failure path is used.

AfterJobRun sees the task result and duration. It also returns HookResult, so it can leave the result alone with Continue, mark the job skipped with Skip, or fail it with Fail(...).

Schedule Hooks

BeforeJobSchedule intercepts jobs before they are persisted. It receives the task identifier, JSON payload, and JobSpec.

Use it to reject, validate, or transform scheduled jobs:

use graphile_worker::{BeforeJobSchedule, HookRegistry, JobScheduleResult, Plugin};

struct SchedulePolicyPlugin;

impl Plugin for SchedulePolicyPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        hooks.on(BeforeJobSchedule, async |ctx| {
            if ctx.identifier == "blocked_task" {
                return JobScheduleResult::Skip;
            }

            let mut payload = ctx.payload.clone();
            if let Some(object) = payload.as_object_mut() {
                object.insert("checked_by_hook".into(), serde_json::json!(true));
            }

            JobScheduleResult::Continue(payload)
        });
    }
}

If a schedule hook returns Skip, no job is inserted. If it returns Fail(...), scheduling returns an error. If it returns Continue(payload), that payload is used for the job and passed to any later schedule interceptor.

Cron scheduling also has observer hooks:

HookContext highlightsType
CronTicktimestamp, crontabsobserver
CronJobScheduledcrontab, scheduled_atobserver
BeforeJobScheduleidentifier, payload, specinterceptor

Local Queue Hooks

Local queue hooks observe the in-process queue used by a worker.

HookContext highlightsType
LocalQueueInitworker_idobserver
LocalQueueSetModeworker_id, old_mode, new_modeobserver
LocalQueueGetJobsCompleteworker_id, jobs_countobserver
LocalQueueReturnJobsworker_id, jobs_countobserver
LocalQueueRefetchDelayStartworker_id, duration, threshold, abort_thresholdobserver
LocalQueueRefetchDelayAbortworker_id, count, abort_thresholdobserver
LocalQueueRefetchDelayExpiredworker_idobserver

The local queue mode values are Starting, Polling, Waiting, TtlExpired, and Released.

Example:

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use graphile_worker::{
    HookRegistry, LocalQueueGetJobsComplete, LocalQueueSetMode, Plugin,
};

#[derive(Clone)]
struct QueueMetricsPlugin {
    max_fetched: Arc<AtomicUsize>,
}

impl Plugin for QueueMetricsPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        let max_fetched = self.max_fetched.clone();
        hooks.on(LocalQueueGetJobsComplete, move |ctx| {
            let max_fetched = max_fetched.clone();
            async move {
                max_fetched.fetch_max(ctx.jobs_count, Ordering::Relaxed);
            }
        });

        hooks.on(LocalQueueSetMode, async |ctx| {
            println!(
                "local queue mode changed: {:?} -> {:?}",
                ctx.old_mode,
                ctx.new_mode
            );
        });
    }
}

Recovery Hook

JobRecovery is an interceptor for recovered jobs. Its context includes the job, the current worker_id, the previous_worker_id, and the recovery reason.

It returns JobRecoveryResult:

use graphile_worker::JobRecoveryResult;

JobRecoveryResult::Default
JobRecoveryResult::Reschedule {
    run_at,
    attempts: Some(3),
}
JobRecoveryResult::FailWithBackoff
JobRecoveryResult::Skip

Use Default to keep the worker's standard recovery behavior. Use the other variants only when the recovered job needs explicit policy.

Practical Example

This plugin combines observer hooks for metrics with an interceptor for payload-based validation:

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use graphile_worker::{
    BeforeJobRun, HookRegistry, HookResult, JobComplete, JobFail, JobStart, Plugin,
};

#[derive(Default)]
struct Counters {
    started: AtomicU64,
    completed: AtomicU64,
    failed: AtomicU64,
}

struct MetricsAndValidationPlugin {
    counters: Arc<Counters>,
}

impl Plugin for MetricsAndValidationPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        let counters = self.counters.clone();
        hooks.on(JobStart, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.started.fetch_add(1, Ordering::Relaxed);
            }
        });

        let counters = self.counters.clone();
        hooks.on(JobComplete, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.completed.fetch_add(1, Ordering::Relaxed);
            }
        });

        let counters = self.counters.clone();
        hooks.on(JobFail, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.failed.fetch_add(1, Ordering::Relaxed);
            }
        });

        hooks.on(BeforeJobRun, async |ctx| {
            if ctx
                .payload
                .get("skip")
                .and_then(|value| value.as_bool())
                .unwrap_or(false)
            {
                return HookResult::Skip;
            }

            HookResult::Continue
        });
    }
}

See the repository examples in examples/hooks.rs and examples/hooks/*.rs for a runnable version with logging, metrics, validation, and sample jobs.