Worker Options
WorkerOptions is the main builder used to configure a worker before calling
init(). The builder collects database settings, registered task handlers,
scheduling rules, hooks, and performance options, then init() connects to
PostgreSQL, runs migrations, registers task details, and returns a runnable
worker.
Most applications start with WorkerOptions::default(), chain the options they
need, then call init().await? followed by run().await?.
use graphile_worker::WorkerOptions;
let worker = WorkerOptions::default()
.concurrency(5)
.schema("graphile_worker")
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;
worker.run().await?;
Database configuration
Every worker needs a database connection before init() can succeed. You can
either pass an existing connection pool or let the worker create one from a
database URL.
let worker = WorkerOptions::default()
.database_url("postgres://user:password@localhost/mydb")
.max_pg_conn(20)
.define_job::<SendEmail>()
.init()
.await?;
When you already have a SQLx pool, pass it directly:
let worker = WorkerOptions::default()
.pg_pool(pg_pool)
.define_job::<SendEmail>()
.init()
.await?;
Important database methods:
database(value)uses an existing driver-agnostic database connection.pg_pool(pool)is the SQLx convenience wrapper for passing an existingsqlx::PgPool.database_url(url)stores the PostgreSQL URL used when no existing connection was provided.max_pg_conn(count)sets the maximum pool size when the worker creates a pool fromdatabase_url; the default is20.
If both an existing connection and database_url are provided, the existing
connection takes precedence.
Core runtime settings
Use the core options to control where worker tables live and how aggressively the worker processes jobs.
use std::time::Duration;
let worker = WorkerOptions::default()
.schema("my_app_worker")
.concurrency(10)
.poll_interval(Duration::from_millis(500))
.use_notification_delivery(true)
.use_local_time(false)
.pg_pool(pg_pool)
.define_job::<SendEmail>()
.init()
.await?;
Important core methods:
schema(name)sets the PostgreSQL schema for Graphile Worker tables. If it is not set, the default schema isgraphile_worker.concurrency(count)sets the maximum number of jobs processed at the same time. If it is not set, the worker uses the number of logical CPUs. Passing0panics.poll_interval(duration)controls fallback polling for new or future jobs. The default is one second.use_notification_delivery(value)controls whether the worker listens for PostgreSQLNOTIFYwakeups. The default istrue; set it tofalseto use polling only. Notifications are treated as coalesced wakeup hints; when all workers are already saturated, extra notifications are dropped and the runner keeps draining the listener instead of blocking on worker fanout.use_local_time(value)controls whether timestamps use application time (true) or PostgreSQL server time (false). The default is PostgreSQL server time.
PostgreSQL server time is the safer default when multiple worker processes run against the same database.
Job registration
Register every task handler that this worker should be able to run. The common
case is one or more define_job::<T>() calls:
let worker = WorkerOptions::default()
.define_job::<SendEmail>()
.define_job::<GenerateReport>()
.pg_pool(pg_pool)
.init()
.await?;
For larger applications, modules can expose reusable job definitions and the worker can register them together:
use graphile_worker::{JobDefinition, TaskHandler};
pub fn jobs() -> [JobDefinition; 2] {
[
SendEmail::definition(),
GenerateReport::definition(),
]
}
let worker = WorkerOptions::default()
.define_jobs(jobs())
.pg_pool(pg_pool)
.init()
.await?;
Important job methods:
define_job::<T>()registers aTaskHandler.define_batch_job::<T>()registers aBatchTaskHandler.define_jobs(iterable)registers reusableJobDefinitionvalues.add_forbidden_flag(flag)makes this worker skip jobs with that flag.
When add_forbidden_flag is used, local queue configuration is disabled during
initialization.
Cron schedules
Cron entries are added with with_cron or with_crons. Typed cron builders are
useful when you want Rust types to define the task and payload:
use graphile_worker::{Cron, CronJobKeyMode, CrontabFill, WorkerOptions};
let worker = WorkerOptions::default()
.define_job::<SayHello>()
.pg_pool(pg_pool)
.with_cron(
Cron::every_n_minutes::<SayHello>(2)?
.fill(CrontabFill::minutes(10))
.job_key("say_hello_dedupe")
.job_key_mode(CronJobKeyMode::PreserveRunAt)
.payload(SayHello {
message: "Crontab".to_string(),
})?,
)
.init()
.await?;
Crontab text is also accepted, but because it must be parsed, the call returns a
Result:
let options = WorkerOptions::default()
.define_job::<SendDigest>()
.with_cron("0 8 * * * send_digest")?;
Use with_crons([...]) when you already have multiple typed crontab values to
append.
Local queue
local_queue(config) enables batch-fetching jobs into an in-process local
queue. This can reduce PostgreSQL load for high-throughput workers by fetching
several jobs at once and distributing them locally to the worker's concurrency
slots.
use graphile_worker::{LocalQueueConfig, RefetchDelayConfig, WorkerOptions};
use std::time::Duration;
let worker = WorkerOptions::default()
.concurrency(4)
.schema("example_local_queue")
.define_job::<ProcessItem>()
.pg_pool(pg_pool)
.local_queue(
LocalQueueConfig::default()
.with_size(50)
.with_ttl(Duration::from_secs(60))
.with_refetch_delay(
RefetchDelayConfig::default()
.with_duration(Duration::from_millis(100))
.with_threshold(5)
.with_max_abort_threshold(20),
),
)
.init()
.await?;
Local queue settings are validated against the configured poll_interval during
init(). If forbidden flags are configured, the worker ignores the local queue
configuration.
Hooks, plugins, and extensions
Worker options can also compose application state and hook behavior:
let options = WorkerOptions::default()
.add_extension(app_config)
.add_plugin(LocalQueueMonitorPlugin::default())
.on(JobStart, |ctx| async move {
tracing::info!("job {} started", ctx.job.id());
});
Important composition methods:
add_extension(value)stores typed application state that handlers can read from the worker context.on(event, handler)registers a hook handler directly.add_plugin(plugin)registers a plugin that can attach several hooks.
Plugins are a good fit when a feature needs to register several related hooks, such as monitoring local queue events.
Completion and failure batching
For high-throughput workers, completion and permanent failure updates can be batched to reduce SQL round trips:
use std::time::Duration;
let worker = WorkerOptions::default()
.complete_job_batch_delay(Duration::from_millis(5))
.fail_job_batch_delay(Duration::from_millis(5))
.define_job::<ProcessItem>()
.pg_pool(pg_pool)
.init()
.await?;
complete_job_batch_delay(delay) batches successful completion updates.
fail_job_batch_delay(delay) batches permanent failure updates. Retryable
failures are still processed individually so retry backoff timing can be
preserved.
Composition examples
Basic worker
This is the smallest common shape: configure the database, register jobs, and run.
let worker = WorkerOptions::default()
.schema("example_simple_worker")
.concurrency(2)
.define_job::<SayHello>()
.pg_pool(pg_pool)
.init()
.await?;
worker.run().await?;
Scheduled worker
Scheduled workers are ordinary workers with cron entries added before initialization.
let worker = WorkerOptions::default()
.schema("example_simple_worker")
.concurrency(10)
.define_job::<SayHello>()
.define_job::<SayHello2>()
.pg_pool(pg_pool)
.with_cron(
Cron::every_n_minutes::<SayHello>(2)?
.fill(CrontabFill::minutes(10))
.payload(SayHello {
message: "Crontab".to_string(),
})?,
)
.init()
.await?;
Throughput-oriented worker
For job bursts, combine a tuned concurrency value with local queueing and small batching delays.
use std::time::Duration;
let worker = WorkerOptions::default()
.schema("jobs")
.concurrency(16)
.poll_interval(Duration::from_millis(500))
.complete_job_batch_delay(Duration::from_millis(5))
.fail_job_batch_delay(Duration::from_millis(5))
.local_queue(
LocalQueueConfig::default()
.with_size(100)
.with_ttl(Duration::from_secs(60)),
)
.define_jobs(jobs())
.pg_pool(pg_pool)
.init()
.await?;
Tune these values with your own workload, PostgreSQL latency, pool size, and job duration. Higher concurrency is useful only when the database pool and task workload can support it.