Skip to content
18 changes: 3 additions & 15 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,12 @@ jobs:
ci:
runs-on: ubuntu-latest

services:
postgres:
image: postgres:14-alpine
env:
POSTGRES_DB: test
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
ports:
- 5436:5432
options: >-
--health-cmd "pg_isready -U postgres -d test"
--health-interval 10s
--health-timeout 5s
--health-retries 5

steps:
- uses: actions/checkout@v4

- name: Start Postgres
run: docker compose up -d --wait

- name: Install uv
uses: astral-sh/setup-uv@v4

Expand Down
107 changes: 107 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Use at your own risk.
- **Await events** - Pause until external events arrive (with optional timeouts)
- **Retry on failure** - Configurable retry strategies with exponential backoff
- **Scale horizontally** - Multiple workers can process tasks concurrently
- **Cron scheduling** - Run tasks on recurring schedules via pg_cron integration

Unlike exception-based durable execution systems (Python, TypeScript), this SDK uses Rust's `Result` type for suspension control flow, making it idiomatic and type-safe.

Expand Down Expand Up @@ -260,6 +261,101 @@ client.emit_event(
).await?;
```

### Cron Scheduling

Schedule tasks to run on a recurring basis using [pg_cron](https://github.com/citusdata/pg_cron). Durable manages the pg_cron jobs and maintains a registry for discovery and filtering.

**Setup** - Enable pg_cron once at startup:

```rust
use durable::setup_pgcron;

// Enable the pg_cron extension (requires superuser or appropriate privileges)
setup_pgcron(client.pool()).await?;
```

**Create a schedule:**

```rust
use durable::ScheduleOptions;

let options = ScheduleOptions {
task_name: "process-payments".to_string(),
cron_expression: "*/5 * * * *".to_string(), // Every 5 minutes
params: json!({"batch_size": 100}),
spawn_options: SpawnOptions::default(),
metadata: Some(HashMap::from([
("team".to_string(), json!("payments")),
("env".to_string(), json!("production")),
])),
};

// Creates or updates the schedule (upsert semantics)
client.create_schedule("payment-schedule", options).await?;
```

**List and filter schedules:**

```rust
use durable::ScheduleFilter;

// List all schedules on this queue
let all = client.list_schedules(None).await?;

// Filter by task name
let filter = ScheduleFilter {
task_name: Some("process-payments".to_string()),
..Default::default()
};
let filtered = client.list_schedules(Some(filter)).await?;

// Filter by metadata (JSONB containment)
let filter = ScheduleFilter {
metadata: Some(HashMap::from([("team".to_string(), json!("payments"))])),
..Default::default()
};
let filtered = client.list_schedules(Some(filter)).await?;
```

**Delete a schedule:**

```rust
// Removes the schedule and its pg_cron job. In-flight tasks are not cancelled.
client.delete_schedule("payment-schedule").await?;
```

**Key behaviors:**

- **pg_cron integration** - Schedules are backed by PostgreSQL's pg_cron extension. At each tick, pg_cron inserts a task into the queue via `durable.spawn_task()`, and workers pick it up normally.
- **Upsert semantics** - Calling `create_schedule` with an existing name updates the schedule in place.
- **Origin tracking** - Scheduled tasks automatically receive `durable::scheduled_by` and `durable::cron` headers, so tasks can identify how they were spawned.
- **Metadata filtering** - Attach arbitrary JSON metadata to schedules and filter with JSONB containment queries.
- **Queue cleanup** - Dropping a queue automatically unschedules all its cron jobs.

### Polling Task Results

You can poll for the result of a spawned task without running a worker in the same process:

```rust
use durable::TaskStatus;

let spawned = client.spawn::<MyTask>(params).await?;

// Poll for the result
let result = client.get_task_result(spawned.task_id).await?;

if let Some(poll) = result {
match poll.status {
TaskStatus::Completed => println!("Output: {:?}", poll.output),
TaskStatus::Failed => println!("Error: {:?}", poll.error),
TaskStatus::Pending | TaskStatus::Running | TaskStatus::Sleeping => {
println!("Still in progress...")
}
TaskStatus::Cancelled => println!("Task was cancelled"),
}
}
```

### Transactional Spawning

You can atomically enqueue a task as part of a larger database transaction. This ensures that either both your write and the task spawn succeed, or neither does:
Expand Down Expand Up @@ -318,11 +414,22 @@ This is useful when you need to guarantee that a task is only enqueued if relate
| [`RetryStrategy`] | Retry behavior: `None`, `Fixed`, or `Exponential` |
| [`CancellationPolicy`] | Auto-cancel tasks based on delay or duration |

### Cron Scheduling

| Type | Description |
|------|-------------|
| [`ScheduleOptions`] | Options for creating a cron schedule (task, expression, params, metadata) |
| [`ScheduleInfo`] | Information about an existing schedule |
| [`ScheduleFilter`] | Filter for listing schedules (by task name or metadata) |
| [`setup_pgcron()`] | Initialize the pg_cron extension |

### Results

| Type | Description |
|------|-------------|
| [`SpawnResult`] | Returned when spawning a task (task_id, run_id, attempt) |
| [`TaskPollResult`] | Result of polling a task (status, output, error) |
| [`TaskStatus`] | Task state: `Pending`, `Running`, `Sleeping`, `Completed`, `Failed`, `Cancelled` |
| [`ControlFlow`] | Signals for suspension and cancellation |

## Environment Variables
Expand Down
6 changes: 5 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
services:
postgres:
image: postgres:14-alpine
build:
context: ../../tensorzero-core/tests/e2e
dockerfile: Dockerfile.postgres
image: tensorzero/postgres-dev:14-trixie-slim
environment:
POSTGRES_DB: test
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
command: ["postgres", "-c", "cron.database_name=test"]
ports:
- "5436:5432"
healthcheck:
Expand Down
45 changes: 45 additions & 0 deletions sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ create table if not exists durable.queues (
created_at timestamptz not null default durable.current_time()
);

create table if not exists durable.cron_schedules (
schedule_name text not null,
queue_name text not null,
task_name text not null,
cron_expression text not null,
params jsonb not null default '{}'::jsonb,
spawn_options jsonb not null default '{}'::jsonb,
metadata jsonb not null default '{}'::jsonb,
pgcron_job_name text not null,
created_at timestamptz not null default durable.current_time(),
updated_at timestamptz not null default durable.current_time(),
primary key (queue_name, schedule_name)
);

create index if not exists idx_cron_schedules_metadata
on durable.cron_schedules using gin (metadata);

create index if not exists idx_cron_schedules_task_name
on durable.cron_schedules (queue_name, task_name);

create function durable.ensure_queue_tables (p_queue_name text)
returns void
language plpgsql
Expand Down Expand Up @@ -252,12 +272,15 @@ end;
$$;

-- Drop a queue if it exists.
-- Also cleans up any cron schedules and their pg_cron jobs for the queue.
create function durable.drop_queue (p_queue_name text)
returns void
language plpgsql
as $$
declare
v_existing_queue text;
v_rec record;
v_jobid bigint;
begin
select queue_name into v_existing_queue
from durable.queues
Expand All @@ -267,6 +290,28 @@ begin
return;
end if;

-- Clean up any cron schedules associated with this queue
for v_rec in
select pgcron_job_name
from durable.cron_schedules
where queue_name = p_queue_name
loop
begin
select jobid into v_jobid
from cron.job
where jobname = v_rec.pgcron_job_name;

if v_jobid is not null then
perform cron.unschedule(v_jobid);
end if;
exception when others then
-- pg_cron may not be installed; ignore errors
null;
end;
end loop;

delete from durable.cron_schedules where queue_name = p_queue_name;

execute format('drop table if exists durable.%I cascade', 'w_' || p_queue_name);
execute format('drop table if exists durable.%I cascade', 'e_' || p_queue_name);
execute format('drop table if exists durable.%I cascade', 'c_' || p_queue_name);
Expand Down
8 changes: 6 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl CancellationPolicyDb {
use crate::worker::Worker;

/// Validates that user-provided headers don't use reserved prefixes.
fn validate_headers(headers: &Option<HashMap<String, JsonValue>>) -> DurableResult<()> {
pub(crate) fn validate_headers(headers: &Option<HashMap<String, JsonValue>>) -> DurableResult<()> {
if let Some(headers) = headers {
for key in headers.keys() {
if key.starts_with("durable::") {
Expand Down Expand Up @@ -334,6 +334,10 @@ where
&self.state
}

pub(crate) fn spawn_defaults(&self) -> &SpawnDefaults {
&self.spawn_defaults
}

/// Register a task type. Required before spawning or processing.
///
/// Returns an error if a task with the same name is already registered.
Expand Down Expand Up @@ -578,7 +582,7 @@ where
})
}

fn serialize_spawn_options(
pub(crate) fn serialize_spawn_options(
options: &SpawnOptions,
max_attempts: u32,
) -> serde_json::Result<JsonValue> {
Expand Down
Loading