Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Worker Lifecycle

A worker is built with WorkerOptions, initialized with init(), and then started with either run() or run_once(). The builder collects configuration; init() turns that configuration into a ready Worker.

let worker = graphile_worker::WorkerOptions::default()
    .concurrency(5)
    .schema("graphile_worker")
    .database_url("postgres://postgres:password@localhost/mydb")
    .define_job::<SendEmail>()
    .init()
    .await?;

worker.run().await?;

Worker::options() is an equivalent entry point if you prefer to start from the worker type:

let worker = graphile_worker::Worker::options()
    .database_url("postgres://postgres:password@localhost/mydb")
    .define_job::<SendEmail>()
    .init()
    .await?;

Initialization

WorkerOptions::init() performs the setup work that must happen before jobs can be processed:

  • uses the supplied database connection, or creates one from database_url
  • runs the database migrations for the configured schema
  • registers the configured task identifiers in the database
  • creates a random worker id with the graphile_worker_ prefix
  • applies defaults such as a one second poll interval and CPU-count concurrency
  • prepares shutdown, recovery, hook, queue, and batcher state

init() returns an error if the worker cannot connect to the database, no database URL or pool was supplied, migrations fail, task registration fails, or local queue configuration is invalid.

Database Configuration

A worker needs a PostgreSQL database before init() can succeed. You can pass a URL and let Graphile Worker RS create the pool:

let worker = graphile_worker::WorkerOptions::default()
    .database_url("postgres://postgres:password@localhost/mydb")
    .max_pg_conn(20)
    .define_job::<SendEmail>()
    .init()
    .await?;

Or you can pass an existing database connection. With the SQLx driver, pg_pool is a convenience wrapper:

let pg_pool = sqlx::postgres::PgPoolOptions::new()
    .max_connections(5)
    .connect("postgres://postgres:password@localhost/mydb")
    .await?;

let worker = graphile_worker::WorkerOptions::default()
    .pg_pool(pg_pool)
    .define_job::<SendEmail>()
    .init()
    .await?;

If both an existing database connection and a URL are provided, the existing connection is used. max_pg_conn only applies when the worker creates a pool from database_url.

The schema defaults to graphile_worker. Set a custom schema when you want to isolate worker tables:

let worker = graphile_worker::WorkerOptions::default()
    .schema("my_app_worker")
    .database_url("postgres://postgres:password@localhost/mydb")
    .define_job::<SendEmail>()
    .init()
    .await?;

Task Registration

Workers only run tasks registered on the builder. The common path is define_job::<T>(), where T implements TaskHandler:

use graphile_worker::{
    IntoTaskHandlerResult, TaskHandler, WorkerContext, WorkerOptions,
};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct SendEmail {
    to: String,
}

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("Sending email to {}", self.to);
    }
}

let worker = WorkerOptions::default()
    .define_job::<SendEmail>()
    .database_url("postgres://postgres:password@localhost/mydb")
    .init()
    .await?;

For reusable modules, collect definitions and register them with define_jobs. Batch handlers can be registered with define_batch_job.

Jobs with forbidden flags are skipped by that worker:

let worker = graphile_worker::WorkerOptions::default()
    .add_forbidden_flag("high_memory")
    .define_job::<SendEmail>()
    .database_url("postgres://postgres:password@localhost/mydb")
    .init()
    .await?;

Concurrency and Polling

concurrency controls the maximum number of jobs the worker processes at the same time. If it is not set, the default is the number of logical CPUs. Passing 0 panics.

let worker = graphile_worker::WorkerOptions::default()
    .concurrency(10)
    .poll_interval(std::time::Duration::from_millis(500))
    .database_url("postgres://postgres:password@localhost/mydb")
    .define_job::<SendEmail>()
    .init()
    .await?;

poll_interval controls how often the worker checks PostgreSQL for work when notification delivery is not enough, or when jobs are scheduled for the future. The default is one second.

Running Continuously

Worker::run() is the normal long-running mode for application workers. It:

  • registers the worker for recovery bookkeeping
  • emits the worker start hook
  • starts the crontab scheduler and the job runner together
  • runs until its shutdown signal resolves or an error stops the runner
  • waits for completion and failure batchers to shut down
  • stops recovery tasks
  • emits the worker shutdown hook
  • deregisters the worker
graphile_worker::WorkerOptions::default()
    .concurrency(5)
    .database_url("postgres://postgres:password@localhost/mydb")
    .define_job::<SendEmail>()
    .init()
    .await?
    .run()
    .await?;

Use run() for service processes, web application sidecars, and deployments where the worker should keep listening for new jobs.

Running Once

Worker::run_once() processes currently available jobs and then returns. It uses the same configured concurrency and task handlers, but it does not start the continuous lifecycle used by run().

This is useful for scripts, tests, local maintenance commands, and one-shot workers:

let worker = graphile_worker::WorkerOptions::default()
    .concurrency(2)
    .schema("example_simple_worker")
    .define_job::<SayHello>()
    .pg_pool(pg_pool)
    .init()
    .await?;

let helpers = worker.create_utils();

for i in 0..10 {
    helpers
        .add_job(
            SayHello {
                message: format!("world {}", i),
            },
            graphile_worker::JobSpec::default(),
        )
        .await?;
}

worker.run_once().await?;

When a run-once job belongs to a queue, the worker checks for another job after releasing it so queued work can continue in order.

Shutdown

Each worker has an internal shutdown signal. By default, Graphile Worker RS also listens for OS shutdown signals such as SIGINT and SIGTERM. You can request shutdown from application code:

worker.request_shutdown();

If your host application already owns shutdown handling, pass a custom signal and disable the built-in OS listeners:

let worker = graphile_worker::WorkerOptions::default()
    .listen_os_shutdown_signals(false)
    .shutdown_signal(async {
        wait_for_application_shutdown().await;
    })
    .shutdown_grace_period(std::time::Duration::from_secs(10))
    .shutdown_interrupted_job_retry_delay(std::time::Duration::from_secs(30))
    .database_url("postgres://postgres:password@localhost/mydb")
    .define_job::<SendEmail>()
    .init()
    .await?;

worker.run().await?;

The same settings can be supplied as a WorkerShutdownConfig with worker_shutdown(config).

Dropping a Worker also notifies the internal shutdown signal as a safety net, but normal applications should shut down explicitly through the configured signal or request_shutdown().

Worker IDs and the Node.js Version

Graphile Worker RS is compatible with the original Node.js Graphile Worker for the queue schema, but worker identity is different. In the Node.js version, each process has its own worker_id. In the Rust version, init() creates one worker id for the Worker instance, and jobs are processed concurrently by the enabled async runtime using that same worker id.