diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 439ab08..b80da00 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,24 +14,12 @@ jobs: ci: runs-on: ubuntu-latest - services: - postgres: - image: postgres:14-alpine - env: - POSTGRES_DB: test - POSTGRES_USER: postgres - POSTGRES_PASSWORD: postgres - ports: - - 5436:5432 - options: >- - --health-cmd "pg_isready -U postgres -d test" - --health-interval 10s - --health-timeout 5s - --health-retries 5 - steps: - uses: actions/checkout@v4 + - name: Start Postgres + run: docker compose up -d --wait + - name: Install uv uses: astral-sh/setup-uv@v4 diff --git a/README.md b/README.md index 2d6e5e6..320c069 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Use at your own risk. - **Await events** - Pause until external events arrive (with optional timeouts) - **Retry on failure** - Configurable retry strategies with exponential backoff - **Scale horizontally** - Multiple workers can process tasks concurrently +- **Cron scheduling** - Run tasks on recurring schedules via pg_cron integration Unlike exception-based durable execution systems (Python, TypeScript), this SDK uses Rust's `Result` type for suspension control flow, making it idiomatic and type-safe. @@ -260,6 +261,101 @@ client.emit_event( ).await?; ``` +### Cron Scheduling + +Schedule tasks to run on a recurring basis using [pg_cron](https://github.com/citusdata/pg_cron). Durable manages the pg_cron jobs and maintains a registry for discovery and filtering. + +**Setup** - Enable pg_cron once at startup: + +```rust +use durable::setup_pgcron; + +// Enable the pg_cron extension (requires superuser or appropriate privileges) +setup_pgcron(client.pool()).await?; +``` + +**Create a schedule:** + +```rust +use durable::ScheduleOptions; + +let options = ScheduleOptions { + task_name: "process-payments".to_string(), + cron_expression: "*/5 * * * *".to_string(), // Every 5 minutes + params: json!({"batch_size": 100}), + spawn_options: SpawnOptions::default(), + metadata: Some(HashMap::from([ + ("team".to_string(), json!("payments")), + ("env".to_string(), json!("production")), + ])), +}; + +// Creates or updates the schedule (upsert semantics) +client.create_schedule("payment-schedule", options).await?; +``` + +**List and filter schedules:** + +```rust +use durable::ScheduleFilter; + +// List all schedules on this queue +let all = client.list_schedules(None).await?; + +// Filter by task name +let filter = ScheduleFilter { + task_name: Some("process-payments".to_string()), + ..Default::default() +}; +let filtered = client.list_schedules(Some(filter)).await?; + +// Filter by metadata (JSONB containment) +let filter = ScheduleFilter { + metadata: Some(HashMap::from([("team".to_string(), json!("payments"))])), + ..Default::default() +}; +let filtered = client.list_schedules(Some(filter)).await?; +``` + +**Delete a schedule:** + +```rust +// Removes the schedule and its pg_cron job. In-flight tasks are not cancelled. +client.delete_schedule("payment-schedule").await?; +``` + +**Key behaviors:** + +- **pg_cron integration** - Schedules are backed by PostgreSQL's pg_cron extension. At each tick, pg_cron inserts a task into the queue via `durable.spawn_task()`, and workers pick it up normally. +- **Upsert semantics** - Calling `create_schedule` with an existing name updates the schedule in place. +- **Origin tracking** - Scheduled tasks automatically receive `durable::scheduled_by` and `durable::cron` headers, so tasks can identify how they were spawned. +- **Metadata filtering** - Attach arbitrary JSON metadata to schedules and filter with JSONB containment queries. +- **Queue cleanup** - Dropping a queue automatically unschedules all its cron jobs. + +### Polling Task Results + +You can poll for the result of a spawned task without running a worker in the same process: + +```rust +use durable::TaskStatus; + +let spawned = client.spawn::(params).await?; + +// Poll for the result +let result = client.get_task_result(spawned.task_id).await?; + +if let Some(poll) = result { + match poll.status { + TaskStatus::Completed => println!("Output: {:?}", poll.output), + TaskStatus::Failed => println!("Error: {:?}", poll.error), + TaskStatus::Pending | TaskStatus::Running | TaskStatus::Sleeping => { + println!("Still in progress...") + } + TaskStatus::Cancelled => println!("Task was cancelled"), + } +} +``` + ### Transactional Spawning You can atomically enqueue a task as part of a larger database transaction. This ensures that either both your write and the task spawn succeed, or neither does: @@ -318,11 +414,22 @@ This is useful when you need to guarantee that a task is only enqueued if relate | [`RetryStrategy`] | Retry behavior: `None`, `Fixed`, or `Exponential` | | [`CancellationPolicy`] | Auto-cancel tasks based on delay or duration | +### Cron Scheduling + +| Type | Description | +|------|-------------| +| [`ScheduleOptions`] | Options for creating a cron schedule (task, expression, params, metadata) | +| [`ScheduleInfo`] | Information about an existing schedule | +| [`ScheduleFilter`] | Filter for listing schedules (by task name or metadata) | +| [`setup_pgcron()`] | Initialize the pg_cron extension | + ### Results | Type | Description | |------|-------------| | [`SpawnResult`] | Returned when spawning a task (task_id, run_id, attempt) | +| [`TaskPollResult`] | Result of polling a task (status, output, error) | +| [`TaskStatus`] | Task state: `Pending`, `Running`, `Sleeping`, `Completed`, `Failed`, `Cancelled` | | [`ControlFlow`] | Signals for suspension and cancellation | ## Environment Variables diff --git a/docker-compose.yml b/docker-compose.yml index d14e472..7122877 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,10 +1,14 @@ services: postgres: - image: postgres:14-alpine + build: + context: ../../tensorzero-core/tests/e2e + dockerfile: Dockerfile.postgres + image: tensorzero/postgres-dev:14-trixie-slim environment: POSTGRES_DB: test POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres + command: ["postgres", "-c", "cron.database_name=test"] ports: - "5436:5432" healthcheck: diff --git a/sql/schema.sql b/sql/schema.sql index 33917f7..fc678bc 100644 --- a/sql/schema.sql +++ b/sql/schema.sql @@ -58,6 +58,26 @@ create table if not exists durable.queues ( created_at timestamptz not null default durable.current_time() ); +create table if not exists durable.cron_schedules ( + schedule_name text not null, + queue_name text not null, + task_name text not null, + cron_expression text not null, + params jsonb not null default '{}'::jsonb, + spawn_options jsonb not null default '{}'::jsonb, + metadata jsonb not null default '{}'::jsonb, + pgcron_job_name text not null, + created_at timestamptz not null default durable.current_time(), + updated_at timestamptz not null default durable.current_time(), + primary key (queue_name, schedule_name) +); + +create index if not exists idx_cron_schedules_metadata + on durable.cron_schedules using gin (metadata); + +create index if not exists idx_cron_schedules_task_name + on durable.cron_schedules (queue_name, task_name); + create function durable.ensure_queue_tables (p_queue_name text) returns void language plpgsql @@ -252,12 +272,15 @@ end; $$; -- Drop a queue if it exists. +-- Also cleans up any cron schedules and their pg_cron jobs for the queue. create function durable.drop_queue (p_queue_name text) returns void language plpgsql as $$ declare v_existing_queue text; + v_rec record; + v_jobid bigint; begin select queue_name into v_existing_queue from durable.queues @@ -267,6 +290,28 @@ begin return; end if; + -- Clean up any cron schedules associated with this queue + for v_rec in + select pgcron_job_name + from durable.cron_schedules + where queue_name = p_queue_name + loop + begin + select jobid into v_jobid + from cron.job + where jobname = v_rec.pgcron_job_name; + + if v_jobid is not null then + perform cron.unschedule(v_jobid); + end if; + exception when others then + -- pg_cron may not be installed; ignore errors + null; + end; + end loop; + + delete from durable.cron_schedules where queue_name = p_queue_name; + execute format('drop table if exists durable.%I cascade', 'w_' || p_queue_name); execute format('drop table if exists durable.%I cascade', 'e_' || p_queue_name); execute format('drop table if exists durable.%I cascade', 'c_' || p_queue_name); diff --git a/src/client.rs b/src/client.rs index ad8d8a2..0fae58d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -55,7 +55,7 @@ impl CancellationPolicyDb { use crate::worker::Worker; /// Validates that user-provided headers don't use reserved prefixes. -fn validate_headers(headers: &Option>) -> DurableResult<()> { +pub(crate) fn validate_headers(headers: &Option>) -> DurableResult<()> { if let Some(headers) = headers { for key in headers.keys() { if key.starts_with("durable::") { @@ -334,6 +334,10 @@ where &self.state } + pub(crate) fn spawn_defaults(&self) -> &SpawnDefaults { + &self.spawn_defaults + } + /// Register a task type. Required before spawning or processing. /// /// Returns an error if a task with the same name is already registered. @@ -578,7 +582,7 @@ where }) } - fn serialize_spawn_options( + pub(crate) fn serialize_spawn_options( options: &SpawnOptions, max_attempts: u32, ) -> serde_json::Result { diff --git a/src/cron.rs b/src/cron.rs new file mode 100644 index 0000000..7cdd22a --- /dev/null +++ b/src/cron.rs @@ -0,0 +1,496 @@ +use chrono::{DateTime, Utc}; +use serde_json::Value as JsonValue; +use sqlx::{PgPool, Postgres, QueryBuilder}; +use std::collections::HashMap; + +use crate::client::{Durable, validate_headers}; +use crate::error::{DurableError, DurableResult}; +use crate::types::SpawnOptions; + +/// Options for creating a cron schedule. +#[derive(Debug, Clone)] +pub struct ScheduleOptions { + /// The task name to spawn on each cron tick. + pub task_name: String, + /// Standard 5-field cron expression (minute hour day-of-month month day-of-week). + pub cron_expression: String, + /// Parameters to pass to the spawned task (serialized as JSON). + pub params: JsonValue, + /// Spawn options (max_attempts, retry_strategy, cancellation, headers). + pub spawn_options: SpawnOptions, + /// Arbitrary user-defined metadata for categorization/filtering. + pub metadata: Option>, +} + +/// Information about an existing cron schedule. +#[derive(Debug, Clone)] +pub struct ScheduleInfo { + /// The schedule name (unique within the queue). + pub name: String, + /// The cron expression. + pub cron_expression: String, + /// The task name that gets spawned. + pub task_name: String, + /// The parameters passed to the spawned task. + pub params: JsonValue, + /// The serialized spawn options. + pub spawn_options: JsonValue, + /// User-defined metadata. + pub metadata: HashMap, + /// The pg_cron job name. + pub pgcron_job_name: String, + /// When the schedule was created. + pub created_at: DateTime, + /// When the schedule was last updated. + pub updated_at: DateTime, +} + +/// Filter for listing schedules. +#[derive(Debug, Clone, Default)] +pub struct ScheduleFilter { + /// Filter by task name (exact match). + pub task_name: Option, + /// Filter by metadata (JSONB `@>` containment). + /// e.g. `{"team": "payments"}` matches schedules whose metadata contains that key-value. + pub metadata: Option>, +} + +/// Set up the pg_cron extension in the database. +/// +/// Attempts to create the extension and verifies it is available. +/// Call this once during application startup before using cron scheduling. +/// +/// # Errors +/// +/// Returns [`DurableError::InvalidConfiguration`] if the extension cannot be created +/// or is not available. +pub async fn setup_pgcron(pool: &PgPool) -> DurableResult<()> { + // Attempt to create the extension, ignoring errors (user may not have privileges) + let _ = sqlx::query( + "DO $$ BEGIN CREATE EXTENSION IF NOT EXISTS pg_cron; EXCEPTION WHEN OTHERS THEN NULL; END $$" + ) + .execute(pool) + .await; + + // Verify it exists + let exists: (bool,) = + sqlx::query_as("SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_cron')") + .fetch_one(pool) + .await?; + + if !exists.0 { + return Err(DurableError::InvalidConfiguration { + reason: "pg_cron extension is not installed and could not be created".to_string(), + }); + } + + Ok(()) +} + +impl Durable +where + State: Clone + Send + Sync + 'static, +{ + /// Create or update a cron schedule. + /// + /// If a schedule with the same name already exists in this queue, it will be updated + /// (upsert semantics). The pg_cron job is created/updated and the schedule metadata + /// is stored in `durable.cron_schedules`. + /// + /// # Arguments + /// + /// * `schedule_name` - Unique name for this schedule within the queue + /// (alphanumeric, hyphens, and underscores only). + /// * `options` - Schedule configuration including task name, cron expression, + /// params, spawn options, and metadata. + /// + /// # Errors + /// + /// Returns an error if: + /// - The schedule name is invalid + /// - Headers use the reserved `durable::` prefix + /// - pg_cron is not available + /// - The cron expression is rejected by pg_cron + pub async fn create_schedule( + &self, + schedule_name: &str, + options: ScheduleOptions, + ) -> DurableResult<()> { + // Validate inputs + validate_schedule_name(schedule_name)?; + validate_headers(&options.spawn_options.headers)?; + + // Check pg_cron availability early before doing any work + let exists: (bool,) = + sqlx::query_as("SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_cron')") + .fetch_one(self.pool()) + .await?; + + if !exists.0 { + return Err(DurableError::InvalidConfiguration { + reason: "pg_cron extension is not installed".to_string(), + }); + } + + let pgcron_job_name = format!("durable::{}::{}", self.queue_name(), schedule_name); + + // Build spawn options with injected durable:: headers + let mut spawn_options = options.spawn_options.clone(); + let headers = spawn_options.headers.get_or_insert_with(HashMap::new); + headers.insert( + "durable::scheduled_by".to_string(), + JsonValue::String(schedule_name.to_string()), + ); + headers.insert( + "durable::cron".to_string(), + JsonValue::String(options.cron_expression.clone()), + ); + + let max_attempts = spawn_options + .max_attempts + .unwrap_or(self.spawn_defaults().max_attempts); + let spawn_options = SpawnOptions { + retry_strategy: spawn_options + .retry_strategy + .or_else(|| self.spawn_defaults().retry_strategy.clone()), + cancellation: spawn_options + .cancellation + .or_else(|| self.spawn_defaults().cancellation.clone()), + ..spawn_options + }; + let db_options = Self::serialize_spawn_options(&spawn_options, max_attempts) + .map_err(DurableError::Serialization)?; + + // Build the SQL command that pg_cron will execute + let spawn_sql = build_pgcron_spawn_sql( + self.queue_name(), + &options.task_name, + &options.params, + &db_options, + )?; + + let metadata_value = match &options.metadata { + Some(m) => serde_json::to_value(m).map_err(DurableError::Serialization)?, + None => serde_json::json!({}), + }; + + // Execute in a transaction + let mut tx = self.pool().begin().await?; + + // Schedule the pg_cron job (has built-in upsert semantics) + sqlx::query("SELECT cron.schedule($1, $2, $3)") + .bind(&pgcron_job_name) + .bind(&options.cron_expression) + .bind(&spawn_sql) + .execute(&mut *tx) + .await?; + + // Upsert into our schedule registry + sqlx::query( + "INSERT INTO durable.cron_schedules + (schedule_name, queue_name, task_name, cron_expression, params, spawn_options, metadata, pgcron_job_name, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, durable.current_time(), durable.current_time()) + ON CONFLICT (queue_name, schedule_name) + DO UPDATE SET + task_name = EXCLUDED.task_name, + cron_expression = EXCLUDED.cron_expression, + params = EXCLUDED.params, + spawn_options = EXCLUDED.spawn_options, + metadata = EXCLUDED.metadata, + pgcron_job_name = EXCLUDED.pgcron_job_name, + updated_at = durable.current_time()" + ) + .bind(schedule_name) + .bind(self.queue_name()) + .bind(&options.task_name) + .bind(&options.cron_expression) + .bind(&options.params) + .bind(&db_options) + .bind(&metadata_value) + .bind(&pgcron_job_name) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + /// Delete a cron schedule. + /// + /// Removes the pg_cron job and the schedule registry entry. Any in-flight + /// tasks that were already spawned by this schedule are not cancelled. + /// + /// # Errors + /// + /// Returns [`DurableError::ScheduleNotFound`] if the schedule does not exist. + pub async fn delete_schedule(&self, schedule_name: &str) -> DurableResult<()> { + // Look up the pgcron_job_name + let row: Option<(String,)> = sqlx::query_as( + "SELECT pgcron_job_name FROM durable.cron_schedules + WHERE queue_name = $1 AND schedule_name = $2", + ) + .bind(self.queue_name()) + .bind(schedule_name) + .fetch_optional(self.pool()) + .await?; + + let (pgcron_job_name,) = row.ok_or_else(|| DurableError::ScheduleNotFound { + schedule_name: schedule_name.to_string(), + queue_name: self.queue_name().to_string(), + })?; + + let mut tx = self.pool().begin().await?; + + // Look up the jobid from cron.job and unschedule it + let job_row: Option<(i64,)> = + sqlx::query_as("SELECT jobid FROM cron.job WHERE jobname = $1") + .bind(&pgcron_job_name) + .fetch_optional(&mut *tx) + .await?; + + if let Some((jobid,)) = job_row { + sqlx::query("SELECT cron.unschedule($1)") + .bind(jobid) + .execute(&mut *tx) + .await?; + } + + // Delete from our registry + sqlx::query( + "DELETE FROM durable.cron_schedules + WHERE queue_name = $1 AND schedule_name = $2", + ) + .bind(self.queue_name()) + .bind(schedule_name) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + /// List cron schedules, optionally filtered. + /// + /// Returns all schedules for this queue. If a filter is provided, results + /// are narrowed by task name and/or metadata containment. + /// + /// This only queries the `durable.cron_schedules` table (no pg_cron queries), + /// so it works even if pg_cron is not installed. + pub async fn list_schedules( + &self, + filter: Option, + ) -> DurableResult> { + let filter = filter.unwrap_or_default(); + + let mut qb: QueryBuilder = QueryBuilder::new( + "SELECT schedule_name, cron_expression, task_name, params, spawn_options, metadata, pgcron_job_name, created_at, updated_at + FROM durable.cron_schedules + WHERE queue_name = ", + ); + qb.push_bind(self.queue_name()); + + if let Some(ref task_name) = filter.task_name { + qb.push(" AND task_name = ").push_bind(task_name.clone()); + } + + if let Some(ref metadata) = filter.metadata { + let metadata_json = + serde_json::to_value(metadata).map_err(DurableError::Serialization)?; + qb.push(" AND metadata @> ") + .push_bind(metadata_json) + .push("::jsonb"); + } + + qb.push(" ORDER BY schedule_name"); + + let rows: Vec = qb + .build_query_as::() + .fetch_all(self.pool()) + .await?; + + rows.into_iter() + .map(|row| { + let metadata: HashMap = + serde_json::from_value(row.metadata).unwrap_or_default(); + Ok(ScheduleInfo { + name: row.schedule_name, + cron_expression: row.cron_expression, + task_name: row.task_name, + params: row.params, + spawn_options: row.spawn_options, + metadata, + pgcron_job_name: row.pgcron_job_name, + created_at: row.created_at, + updated_at: row.updated_at, + }) + }) + .collect() + } +} + +// --- Internal types --- + +#[derive(Debug, sqlx::FromRow)] +struct ScheduleRow { + schedule_name: String, + cron_expression: String, + task_name: String, + params: JsonValue, + spawn_options: JsonValue, + metadata: JsonValue, + pgcron_job_name: String, + created_at: DateTime, + updated_at: DateTime, +} + +// --- Validation helpers --- + +/// Validate a schedule name (alphanumeric, hyphens, underscores only; non-empty). +fn validate_schedule_name(name: &str) -> DurableResult<()> { + if name.is_empty() { + return Err(DurableError::InvalidScheduleName { + name: String::new(), + reason: "schedule name cannot be empty".to_string(), + }); + } + + if !name + .chars() + .all(|c| c.is_alphanumeric() || c == '-' || c == '_') + { + return Err(DurableError::InvalidScheduleName { + name: name.to_string(), + reason: + "contains invalid characters (only alphanumeric, hyphens, and underscores allowed)" + .to_string(), + }); + } + + Ok(()) +} + +// --- SQL escaping --- + +/// Dollar-quote a string using `$durable$` as the delimiter. +/// Returns an error if the content contains `$durable`, which would break the delimiter. +fn pg_literal(s: &str) -> Result { + if s.contains("$durable") { + return Err(DurableError::InvalidConfiguration { + reason: format!("string contains reserved delimiter sequence '$durable': {s}"), + }); + } + Ok(format!("$durable${s}$durable$")) +} + +/// Build the SQL command that pg_cron will execute to spawn a task. +fn build_pgcron_spawn_sql( + queue_name: &str, + task_name: &str, + params: &JsonValue, + spawn_options: &JsonValue, +) -> Result { + let params_str = params.to_string(); + let options_str = spawn_options.to_string(); + + Ok(format!( + "SELECT durable.spawn_task({}, {}, {}::jsonb, {}::jsonb)", + pg_literal(queue_name)?, + pg_literal(task_name)?, + pg_literal(¶ms_str)?, + pg_literal(&options_str)?, + )) +} + +#[cfg(test)] +#[expect(clippy::unwrap_used)] +mod tests { + use super::*; + + // --- pg_literal tests --- + + #[test] + fn test_pg_literal_simple() { + assert_eq!(pg_literal("hello").unwrap(), "$durable$hello$durable$"); + } + + #[test] + fn test_pg_literal_with_single_quotes() { + assert_eq!( + pg_literal("it's a test").unwrap(), + "$durable$it's a test$durable$" + ); + } + + #[test] + fn test_pg_literal_with_json() { + let json = r#"{"key": "value"}"#; + assert_eq!( + pg_literal(json).unwrap(), + format!("$durable${json}$durable$") + ); + } + + #[test] + fn test_pg_literal_rejects_delimiter() { + assert!(pg_literal("contains $durable$ in it").is_err()); + } + + #[test] + fn test_pg_literal_rejects_partial_delimiter() { + assert!(pg_literal("test$durable").is_err()); + assert!(pg_literal("$durablefoo").is_err()); + assert!(pg_literal("mid$durablemid").is_err()); + } + + // --- Schedule name validation tests --- + + #[test] + fn test_valid_schedule_names() { + assert!(validate_schedule_name("my-schedule").is_ok()); + assert!(validate_schedule_name("task_1").is_ok()); + assert!(validate_schedule_name("DailyReport").is_ok()); + assert!(validate_schedule_name("a").is_ok()); + assert!(validate_schedule_name("test-123_abc").is_ok()); + } + + #[test] + fn test_invalid_schedule_name_empty() { + let err = validate_schedule_name("").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); + } + + #[test] + fn test_invalid_schedule_name_spaces() { + let err = validate_schedule_name("my schedule").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); + } + + #[test] + fn test_invalid_schedule_name_semicolons() { + let err = validate_schedule_name("drop;table").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); + } + + #[test] + fn test_invalid_schedule_name_special_chars() { + let err = validate_schedule_name("name@here").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); + let err = validate_schedule_name("name.here").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); + let err = validate_schedule_name("name/here").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); + } + + // --- build_pgcron_spawn_sql tests --- + + #[test] + fn test_build_pgcron_spawn_sql() { + let params = serde_json::json!({"key": "value"}); + let options = serde_json::json!({"max_attempts": 3}); + let sql = build_pgcron_spawn_sql("my_queue", "my_task", ¶ms, &options).unwrap(); + assert!(sql.contains("durable.spawn_task")); + assert!(sql.contains("my_queue")); + assert!(sql.contains("my_task")); + assert!(sql.contains("::jsonb")); + } +} diff --git a/src/error.rs b/src/error.rs index 7c4a20d..fcb5192 100644 --- a/src/error.rs +++ b/src/error.rs @@ -437,6 +437,24 @@ pub enum DurableError { /// The unrecognized state string. state: String, }, + + /// Schedule name failed validation. + #[error("invalid schedule name `{name}`: {reason}")] + InvalidScheduleName { + /// The invalid schedule name. + name: String, + /// Why the name is invalid. + reason: String, + }, + + /// Schedule not found. + #[error("schedule `{schedule_name}` not found in queue `{queue_name}`")] + ScheduleNotFound { + /// The schedule name that was not found. + schedule_name: String, + /// The queue name that was searched. + queue_name: String, + }, } /// Result type alias for Client API operations. diff --git a/src/lib.rs b/src/lib.rs index eef201c..fed9b2e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,6 +96,7 @@ mod client; mod context; +mod cron; mod error; mod task; #[cfg(feature = "telemetry")] @@ -106,6 +107,7 @@ mod worker; // Re-export public API pub use client::{Durable, DurableBuilder}; pub use context::TaskContext; +pub use cron::{ScheduleFilter, ScheduleInfo, ScheduleOptions, setup_pgcron}; pub use error::{ControlFlow, DurableError, DurableResult, TaskError, TaskResult}; pub use task::{ErasedTask, Task, TaskWrapper}; pub use types::{ diff --git a/src/postgres/migrations/20260228000000_add_cron_schedules.sql b/src/postgres/migrations/20260228000000_add_cron_schedules.sql new file mode 100644 index 0000000..223b0fd --- /dev/null +++ b/src/postgres/migrations/20260228000000_add_cron_schedules.sql @@ -0,0 +1,73 @@ +-- Cron schedule registry table. +-- Stores metadata for schedules managed by pg_cron via the Durable client API. +-- This table always exists (even without pg_cron installed), so list_schedules() works regardless. + +CREATE TABLE IF NOT EXISTS durable.cron_schedules ( + schedule_name TEXT NOT NULL, + queue_name TEXT NOT NULL, + task_name TEXT NOT NULL, + cron_expression TEXT NOT NULL, + params JSONB NOT NULL DEFAULT '{}'::jsonb, + spawn_options JSONB NOT NULL DEFAULT '{}'::jsonb, + metadata JSONB NOT NULL DEFAULT '{}'::jsonb, + pgcron_job_name TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT durable.current_time(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT durable.current_time(), + PRIMARY KEY (queue_name, schedule_name) +); + +CREATE INDEX IF NOT EXISTS idx_cron_schedules_metadata + ON durable.cron_schedules USING gin (metadata); + +CREATE INDEX IF NOT EXISTS idx_cron_schedules_task_name + ON durable.cron_schedules (queue_name, task_name); + +-- Override drop_queue to clean up cron schedules and their pg_cron jobs. +CREATE OR REPLACE FUNCTION durable.drop_queue (p_queue_name text) + returns void + language plpgsql +as $$ +declare + v_existing_queue text; + v_rec record; + v_jobid bigint; +begin + select queue_name into v_existing_queue + from durable.queues + where queue_name = p_queue_name; + + if v_existing_queue is null then + return; + end if; + + -- Clean up any cron schedules associated with this queue + for v_rec in + select pgcron_job_name + from durable.cron_schedules + where queue_name = p_queue_name + loop + begin + select jobid into v_jobid + from cron.job + where jobname = v_rec.pgcron_job_name; + + if v_jobid is not null then + perform cron.unschedule(v_jobid); + end if; + exception when others then + -- pg_cron may not be installed; ignore errors + null; + end; + end loop; + + delete from durable.cron_schedules where queue_name = p_queue_name; + + execute format('drop table if exists durable.%I cascade', 'w_' || p_queue_name); + execute format('drop table if exists durable.%I cascade', 'e_' || p_queue_name); + execute format('drop table if exists durable.%I cascade', 'c_' || p_queue_name); + execute format('drop table if exists durable.%I cascade', 'r_' || p_queue_name); + execute format('drop table if exists durable.%I cascade', 't_' || p_queue_name); + + delete from durable.queues where queue_name = p_queue_name; +end; +$$; diff --git a/tests/cron_test.rs b/tests/cron_test.rs new file mode 100644 index 0000000..87ccc1d --- /dev/null +++ b/tests/cron_test.rs @@ -0,0 +1,540 @@ +#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] + +mod common; + +use durable::{ + Durable, DurableError, MIGRATOR, ScheduleFilter, ScheduleOptions, SpawnOptions, setup_pgcron, +}; +use serde_json::json; +use sqlx::migrate::MigrateDatabase; +use sqlx::{AssertSqlSafe, PgPool, Row}; +use std::collections::HashMap; + +/// Connect to the real `test` database (where pg_cron lives) and run migrations. +/// `sqlx::test` creates temporary databases where pg_cron cannot be installed, +/// so cron tests must use the real database instead. +async fn setup_pool() -> PgPool { + let url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + + // Ensure the database exists + if !sqlx::Postgres::database_exists(&url).await.unwrap() { + sqlx::Postgres::create_database(&url).await.unwrap(); + } + + let pool = PgPool::connect(&url).await.expect("connect to test db"); + MIGRATOR.run(&pool).await.expect("run migrations"); + setup_pgcron(&pool).await.expect("setup pg_cron"); + pool +} + +/// Clean up a queue after a test (removes cron schedules, pg_cron jobs, and queue tables). +async fn cleanup_queue(pool: &PgPool, queue_name: &str) { + sqlx::query("SELECT durable.drop_queue($1)") + .bind(queue_name) + .execute(pool) + .await + .expect("cleanup queue"); +} + +#[tokio::test] +async fn test_setup_pgcron() { + let url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + let pool = PgPool::connect(&url).await.expect("connect to test db"); + MIGRATOR.run(&pool).await.expect("run migrations"); + setup_pgcron(&pool).await.unwrap(); +} + +#[tokio::test] +async fn test_create_and_list_schedule() { + let pool = setup_pool().await; + let queue = "test_cron_create_list"; + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name(queue) + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let mut metadata = HashMap::new(); + metadata.insert("team".to_string(), json!("payments")); + metadata.insert("env".to_string(), json!("production")); + + let options = ScheduleOptions { + task_name: "process-payments".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({"batch_size": 100}), + spawn_options: SpawnOptions::default(), + metadata: Some(metadata), + }; + + durable + .create_schedule("payment-schedule", options) + .await + .unwrap(); + + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 1); + + let schedule = &schedules[0]; + assert_eq!(schedule.name, "payment-schedule"); + assert_eq!(schedule.cron_expression, "*/5 * * * *"); + assert_eq!(schedule.task_name, "process-payments"); + assert_eq!(schedule.params, json!({"batch_size": 100})); + assert_eq!(schedule.metadata.get("team"), Some(&json!("payments"))); + assert_eq!(schedule.metadata.get("env"), Some(&json!("production"))); + assert_eq!( + schedule.pgcron_job_name, + "durable::test_cron_create_list::payment-schedule" + ); + + cleanup_queue(&pool, queue).await; +} + +#[tokio::test] +async fn test_create_schedule_upsert() { + let pool = setup_pool().await; + let queue = "test_cron_upsert"; + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name(queue) + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + // Create initial schedule + let options = ScheduleOptions { + task_name: "task-v1".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({"version": 1}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable + .create_schedule("my-schedule", options) + .await + .unwrap(); + + // Update with same name + let options2 = ScheduleOptions { + task_name: "task-v2".to_string(), + cron_expression: "*/10 * * * *".to_string(), + params: json!({"version": 2}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable + .create_schedule("my-schedule", options2) + .await + .unwrap(); + + // Should still be just 1 schedule, but updated + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 1); + assert_eq!(schedules[0].task_name, "task-v2"); + assert_eq!(schedules[0].cron_expression, "*/10 * * * *"); + assert_eq!(schedules[0].params, json!({"version": 2})); + + cleanup_queue(&pool, queue).await; +} + +#[tokio::test] +async fn test_delete_schedule() { + let pool = setup_pool().await; + let queue = "test_cron_delete"; + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name(queue) + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let options = ScheduleOptions { + task_name: "cleanup-task".to_string(), + cron_expression: "0 0 * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable + .create_schedule("daily-cleanup", options) + .await + .unwrap(); + + // Verify it exists + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 1); + + // Delete it + durable.delete_schedule("daily-cleanup").await.unwrap(); + + // Verify it's gone + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 0); + + cleanup_queue(&pool, queue).await; +} + +#[tokio::test] +async fn test_delete_nonexistent_schedule() { + let pool = setup_pool().await; + let queue = "test_cron_delete_missing"; + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name(queue) + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let result = durable.delete_schedule("nonexistent").await; + assert!(result.is_err()); + match result.unwrap_err() { + DurableError::ScheduleNotFound { + schedule_name, + queue_name, + } => { + assert_eq!(schedule_name, "nonexistent"); + assert_eq!(queue_name, "test_cron_delete_missing"); + } + other => panic!("expected ScheduleNotFound, got: {other:?}"), + } + + cleanup_queue(&pool, queue).await; +} + +#[tokio::test] +async fn test_create_schedule_invalid_cron() { + let pool = setup_pool().await; + let queue = "test_cron_invalid"; + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name(queue) + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let options = ScheduleOptions { + task_name: "my-task".to_string(), + cron_expression: "invalid cron".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + + // pg_cron validates the expression and the transaction should fail + let result = durable.create_schedule("bad-cron", options).await; + assert!(result.is_err()); + + cleanup_queue(&pool, queue).await; +} + +#[tokio::test] +async fn test_schedule_injects_metadata_headers() { + let pool = setup_pool().await; + let queue = "test_cron_headers"; + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name(queue) + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let options = ScheduleOptions { + task_name: "header-task".to_string(), + cron_expression: "0 0 * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable + .create_schedule("header-test", options) + .await + .unwrap(); + + // Verify the spawn_options in the registry contain the durable:: headers + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 1); + + let spawn_opts = &schedules[0].spawn_options; + let headers = spawn_opts.get("headers").expect("should have headers"); + assert_eq!( + headers.get("durable::scheduled_by"), + Some(&json!("header-test")) + ); + assert_eq!(headers.get("durable::cron"), Some(&json!("0 0 * * *"))); + + cleanup_queue(&pool, queue).await; +} + +#[tokio::test] +async fn test_list_schedules_filter_by_metadata() { + let pool = setup_pool().await; + let queue = "test_cron_filter_meta"; + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name(queue) + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + // Create schedules with different metadata + let mut meta_payments = HashMap::new(); + meta_payments.insert("team".to_string(), json!("payments")); + + let mut meta_billing = HashMap::new(); + meta_billing.insert("team".to_string(), json!("billing")); + + let options1 = ScheduleOptions { + task_name: "task-a".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_payments.clone()), + }; + durable + .create_schedule("schedule-a", options1) + .await + .unwrap(); + + let options2 = ScheduleOptions { + task_name: "task-b".to_string(), + cron_expression: "*/10 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_billing), + }; + durable + .create_schedule("schedule-b", options2) + .await + .unwrap(); + + // Filter by payments team + let filter = ScheduleFilter { + metadata: Some(meta_payments), + ..Default::default() + }; + let schedules = durable.list_schedules(Some(filter)).await.unwrap(); + assert_eq!(schedules.len(), 1); + assert_eq!(schedules[0].name, "schedule-a"); + + // No filter returns both + let all = durable.list_schedules(None).await.unwrap(); + assert_eq!(all.len(), 2); + + cleanup_queue(&pool, queue).await; +} + +#[tokio::test] +async fn test_list_schedules_filter_by_task_name() { + let pool = setup_pool().await; + let queue = "test_cron_filter_task"; + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name(queue) + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let options1 = ScheduleOptions { + task_name: "process-orders".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable.create_schedule("orders-1", options1).await.unwrap(); + + let options2 = ScheduleOptions { + task_name: "process-orders".to_string(), + cron_expression: "*/15 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable.create_schedule("orders-2", options2).await.unwrap(); + + let options3 = ScheduleOptions { + task_name: "send-reports".to_string(), + cron_expression: "0 9 * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable.create_schedule("reports", options3).await.unwrap(); + + // Filter by task name + let filter = ScheduleFilter { + task_name: Some("process-orders".to_string()), + ..Default::default() + }; + let schedules = durable.list_schedules(Some(filter)).await.unwrap(); + assert_eq!(schedules.len(), 2); + assert!(schedules.iter().all(|s| s.task_name == "process-orders")); + + // Filter by different task name + let filter = ScheduleFilter { + task_name: Some("send-reports".to_string()), + ..Default::default() + }; + let schedules = durable.list_schedules(Some(filter)).await.unwrap(); + assert_eq!(schedules.len(), 1); + assert_eq!(schedules[0].name, "reports"); + + cleanup_queue(&pool, queue).await; +} + +#[tokio::test] +async fn test_list_schedules_combined_filter() { + let pool = setup_pool().await; + let queue = "test_cron_filter_combo"; + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name(queue) + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let mut meta_payments = HashMap::new(); + meta_payments.insert("team".to_string(), json!("payments")); + + let mut meta_billing = HashMap::new(); + meta_billing.insert("team".to_string(), json!("billing")); + + // Same task, different metadata + let options1 = ScheduleOptions { + task_name: "process-data".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_payments.clone()), + }; + durable + .create_schedule("data-payments", options1) + .await + .unwrap(); + + let options2 = ScheduleOptions { + task_name: "process-data".to_string(), + cron_expression: "*/10 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_billing.clone()), + }; + durable + .create_schedule("data-billing", options2) + .await + .unwrap(); + + // Different task, same metadata + let options3 = ScheduleOptions { + task_name: "send-alerts".to_string(), + cron_expression: "0 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_payments.clone()), + }; + durable + .create_schedule("alerts-payments", options3) + .await + .unwrap(); + + // Filter by task + metadata + let filter = ScheduleFilter { + task_name: Some("process-data".to_string()), + metadata: Some(meta_payments), + }; + let schedules = durable.list_schedules(Some(filter)).await.unwrap(); + assert_eq!(schedules.len(), 1); + assert_eq!(schedules[0].name, "data-payments"); + + cleanup_queue(&pool, queue).await; +} + +#[tokio::test] +async fn test_pgcron_job_actually_spawns_task() { + let pool = setup_pool().await; + let queue = "test_cron_fires"; + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name(queue) + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + // Schedule a job that fires every 2 seconds + let options = ScheduleOptions { + task_name: "cron-ping".to_string(), + cron_expression: "2 seconds".to_string(), + params: json!({"source": "cron"}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable + .create_schedule("fast-schedule", options) + .await + .unwrap(); + + // Poll the task table until pg_cron spawns at least one task (up to 3s timeout) + let start = std::time::Instant::now(); + let timeout = std::time::Duration::from_secs(3); + let mut count: i64 = 0; + while start.elapsed() < timeout { + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + let row: (i64,) = sqlx::query_as(AssertSqlSafe(format!( + "SELECT COUNT(*) FROM durable.t_{queue}" + ))) + .fetch_one(&pool) + .await + .unwrap(); + count = row.0; + if count > 0 { + break; + } + } + + assert!(count > 0, "pg_cron should have spawned at least one task"); + + // Verify the spawned task has the right task_name and params + let row = sqlx::query(AssertSqlSafe(format!( + "SELECT task_name, params FROM durable.t_{queue} LIMIT 1" + ))) + .fetch_one(&pool) + .await + .unwrap(); + + let task_name: &str = row.get("task_name"); + let params: serde_json::Value = row.get("params"); + assert_eq!(task_name, "cron-ping"); + assert_eq!(params, json!({"source": "cron"})); + + cleanup_queue(&pool, queue).await; +}