Worker Lifecycle
A worker is built with WorkerOptions, initialized with init(), and then
started with either run() or run_once(). The builder collects configuration;
init() turns that configuration into a ready Worker.
let worker = graphile_worker::WorkerOptions::default()
.concurrency(5)
.schema("graphile_worker")
.database_url("postgres://postgres:password@localhost/mydb")
.define_job::<SendEmail>()
.init()
.await?;
worker.run().await?;
Worker::options() is an equivalent entry point if you prefer to start from the
worker type:
let worker = graphile_worker::Worker::options()
.database_url("postgres://postgres:password@localhost/mydb")
.define_job::<SendEmail>()
.init()
.await?;
Initialization
WorkerOptions::init() performs the setup work that must happen before jobs can
be processed:
- uses the supplied database connection, or creates one from
database_url - runs the database migrations for the configured schema
- registers the configured task identifiers in the database
- creates a random worker id with the
graphile_worker_prefix - applies defaults such as a one second poll interval and CPU-count concurrency
- prepares shutdown, recovery, hook, queue, and batcher state
init() returns an error if the worker cannot connect to the database, no
database URL or pool was supplied, migrations fail, task registration fails, or
local queue configuration is invalid.
Database Configuration
A worker needs a PostgreSQL database before init() can succeed. You can pass a
URL and let Graphile Worker RS create the pool:
let worker = graphile_worker::WorkerOptions::default()
.database_url("postgres://postgres:password@localhost/mydb")
.max_pg_conn(20)
.define_job::<SendEmail>()
.init()
.await?;
Or you can pass an existing database connection. With the SQLx driver, pg_pool
is a convenience wrapper:
let pg_pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect("postgres://postgres:password@localhost/mydb")
.await?;
let worker = graphile_worker::WorkerOptions::default()
.pg_pool(pg_pool)
.define_job::<SendEmail>()
.init()
.await?;
If both an existing database connection and a URL are provided, the existing
connection is used. max_pg_conn only applies when the worker creates a pool
from database_url.
The schema defaults to graphile_worker. Set a custom schema when you want to
isolate worker tables:
let worker = graphile_worker::WorkerOptions::default()
.schema("my_app_worker")
.database_url("postgres://postgres:password@localhost/mydb")
.define_job::<SendEmail>()
.init()
.await?;
Task Registration
Workers only run tasks registered on the builder. The common path is
define_job::<T>(), where T implements TaskHandler:
use graphile_worker::{
IntoTaskHandlerResult, TaskHandler, WorkerContext, WorkerOptions,
};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SendEmail {
to: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Sending email to {}", self.to);
}
}
let worker = WorkerOptions::default()
.define_job::<SendEmail>()
.database_url("postgres://postgres:password@localhost/mydb")
.init()
.await?;
For reusable modules, collect definitions and register them with define_jobs.
Batch handlers can be registered with define_batch_job.
Jobs with forbidden flags are skipped by that worker:
let worker = graphile_worker::WorkerOptions::default()
.add_forbidden_flag("high_memory")
.define_job::<SendEmail>()
.database_url("postgres://postgres:password@localhost/mydb")
.init()
.await?;
Concurrency and Polling
concurrency controls the maximum number of jobs the worker processes at the
same time. If it is not set, the default is the number of logical CPUs. Passing
0 panics.
let worker = graphile_worker::WorkerOptions::default()
.concurrency(10)
.poll_interval(std::time::Duration::from_millis(500))
.database_url("postgres://postgres:password@localhost/mydb")
.define_job::<SendEmail>()
.init()
.await?;
poll_interval controls how often the worker checks PostgreSQL for work when
notification delivery is not enough, or when jobs are scheduled for the future.
The default is one second.
Running Continuously
Worker::run() is the normal long-running mode for application workers. It:
- registers the worker for recovery bookkeeping
- emits the worker start hook
- starts the crontab scheduler and the job runner together
- runs until its shutdown signal resolves or an error stops the runner
- waits for completion and failure batchers to shut down
- stops recovery tasks
- emits the worker shutdown hook
- deregisters the worker
graphile_worker::WorkerOptions::default()
.concurrency(5)
.database_url("postgres://postgres:password@localhost/mydb")
.define_job::<SendEmail>()
.init()
.await?
.run()
.await?;
Use run() for service processes, web application sidecars, and deployments
where the worker should keep listening for new jobs.
Running Once
Worker::run_once() processes currently available jobs and then returns. It
uses the same configured concurrency and task handlers, but it does not start
the continuous lifecycle used by run().
This is useful for scripts, tests, local maintenance commands, and one-shot workers:
let worker = graphile_worker::WorkerOptions::default()
.concurrency(2)
.schema("example_simple_worker")
.define_job::<SayHello>()
.pg_pool(pg_pool)
.init()
.await?;
let helpers = worker.create_utils();
for i in 0..10 {
helpers
.add_job(
SayHello {
message: format!("world {}", i),
},
graphile_worker::JobSpec::default(),
)
.await?;
}
worker.run_once().await?;
When a run-once job belongs to a queue, the worker checks for another job after releasing it so queued work can continue in order.
Shutdown
Each worker has an internal shutdown signal. By default, Graphile Worker RS also
listens for OS shutdown signals such as SIGINT and SIGTERM. You can request
shutdown from application code:
worker.request_shutdown();
If your host application already owns shutdown handling, pass a custom signal and disable the built-in OS listeners:
let worker = graphile_worker::WorkerOptions::default()
.listen_os_shutdown_signals(false)
.shutdown_signal(async {
wait_for_application_shutdown().await;
})
.shutdown_grace_period(std::time::Duration::from_secs(10))
.shutdown_interrupted_job_retry_delay(std::time::Duration::from_secs(30))
.database_url("postgres://postgres:password@localhost/mydb")
.define_job::<SendEmail>()
.init()
.await?;
worker.run().await?;
The same settings can be supplied as a WorkerShutdownConfig with
worker_shutdown(config).
Dropping a Worker also notifies the internal shutdown signal as a safety net,
but normal applications should shut down explicitly through the configured
signal or request_shutdown().
Worker IDs and the Node.js Version
Graphile Worker RS is compatible with the original Node.js Graphile Worker for
the queue schema, but worker identity is different. In the Node.js version, each
process has its own worker_id. In the Rust version, init() creates one
worker id for the Worker instance, and jobs are processed concurrently by the
enabled async runtime using that same worker id.