First Worker
This page walks through the smallest useful Graphile Worker RS flow:
- Create a PostgreSQL pool.
- Define a task handler.
- Configure
WorkerOptions. - Initialize the worker.
- Add a job.
- Run the worker.
- Let shutdown happen gracefully.
The examples use Tokio and SQLx, matching the default setup used by the repository examples.
Define a Handler
A worker processes jobs by matching each job's task identifier to a registered
TaskHandler. The handler type is also the payload type, so it should derive
Serialize and Deserialize.
use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct SayHello {
message: String,
}
impl TaskHandler for SayHello {
const IDENTIFIER: &'static str = "say_hello";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Hello {} !", self.message);
Ok::<(), String>(())
}
}
IDENTIFIER is the task name stored in PostgreSQL. Register the same handler
type on any worker that should be able to process jobs with that identifier.
Create the Pool
You can pass an existing SQLx PostgreSQL pool with pg_pool. This is useful
when your application already owns database configuration.
use std::str::FromStr;
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
let pg_options = PgConnectOptions::from_str(
"postgres://postgres:root@localhost:5432",
)?;
let pg_pool = PgPoolOptions::new()
.max_connections(5)
.connect_with(pg_options)
.await?;
You can also let Graphile Worker create the pool from a URL with
database_url. When using database_url, max_pg_conn controls the pool size
and defaults to 20 if it is not set.
let options = graphile_worker::WorkerOptions::default()
.database_url("postgres://postgres:root@localhost:5432")
.max_pg_conn(5);
If both an existing database connection and database_url are provided, the
existing connection takes precedence.
Configure and Initialize
WorkerOptions is a builder. The usual first-worker options are:
concurrency: maximum jobs processed at the same time. If omitted, it defaults to the number of logical CPUs.schema: PostgreSQL schema used for Graphile Worker tables. If omitted, it defaults tographile_worker.define_job::<T>(): registers aTaskHandlertype.pg_pool,database, ordatabase_url: provides the database connection.
Calling init().await connects to the database if needed, runs migrations for
the selected schema, registers task details, creates the worker id, and returns
a ready Worker.
use graphile_worker::WorkerOptions;
let worker = WorkerOptions::default()
.concurrency(2)
.schema("example_simple_worker")
.define_job::<SayHello>()
.pg_pool(pg_pool)
.init()
.await?;
Add a Job
After initialization, create utilities from the worker and enqueue a job.
JobSpec::default() schedules it with default job options.
use graphile_worker::JobSpec;
let helpers = worker.create_utils();
helpers
.add_job(
SayHello {
message: "world".to_string(),
},
JobSpec::default(),
)
.await?;
To schedule a job for later, build a JobSpec with JobSpecBuilder.
use chrono::{Duration, Utc};
use graphile_worker::JobSpecBuilder;
helpers
.add_job(
SayHello {
message: "world".to_string(),
},
JobSpecBuilder::new()
.run_at(Utc::now() + Duration::seconds(10))
.build(),
)
.await?;
Run the Worker
run() starts the long-running worker loop. It registers the worker, starts job
sources, processes jobs until shutdown is requested, waits for batchers to
finish, emits shutdown hooks, and deregisters the worker.
worker.run().await?;
For scripts and smoke tests, run_once() processes currently available jobs and
then returns.
worker.run_once().await?;
Full Example
use std::str::FromStr;
use graphile_worker::{
IntoTaskHandlerResult, JobSpec, TaskHandler, WorkerContext, WorkerOptions,
};
use serde::{Deserialize, Serialize};
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
#[derive(Deserialize, Serialize)]
struct SayHello {
message: String,
}
impl TaskHandler for SayHello {
const IDENTIFIER: &'static str = "say_hello";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Hello {} !", self.message);
Ok::<(), String>(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pg_options = PgConnectOptions::from_str(
"postgres://postgres:root@localhost:5432",
)?;
let pg_pool = PgPoolOptions::new()
.max_connections(5)
.connect_with(pg_options)
.await?;
let worker = WorkerOptions::default()
.concurrency(2)
.schema("example_simple_worker")
.define_job::<SayHello>()
.pg_pool(pg_pool)
.init()
.await?;
let helpers = worker.create_utils();
helpers
.add_job(
SayHello {
message: "world".to_string(),
},
JobSpec::default(),
)
.await?;
worker.run().await?;
Ok(())
}
Shutdown
By default, the worker listens for OS shutdown signals such as Ctrl+C and SIGTERM. When a shutdown signal arrives, in-flight jobs are given a grace period to finish. The default grace period is 5 seconds, and shutdown-aborted jobs are retried after a default delay of 30 seconds.
If the host application already owns shutdown handling, pass a custom shutdown future and disable OS signal listeners:
use graphile_worker::WorkerShutdownConfig;
use std::time::Duration;
let shutdown = WorkerShutdownConfig::default()
.listen_os_shutdown_signals(false)
.grace_period(Duration::from_secs(10))
.interrupted_job_retry_delay(Duration::from_secs(30))
.shutdown_signal(async {
// Complete this future when your application wants the worker to stop.
});
let worker = WorkerOptions::default()
.worker_shutdown(shutdown)
.define_job::<SayHello>()
.pg_pool(pg_pool)
.init()
.await?;
worker.run().await?;
The same settings can also be configured directly on WorkerOptions with
listen_os_shutdown_signals, shutdown_signal, shutdown_grace_period, and
shutdown_interrupted_job_retry_delay.
Next, see Scheduling Jobs for job timing options and Worker Options for more configuration details.