From b877b74a0d63247c9d8e5fc7fe3383365d782e95 Mon Sep 17 00:00:00 2001 From: Kasun Vithanage Date: Fri, 27 Feb 2026 01:24:23 +0530 Subject: [PATCH 1/2] feat: add workflow orchestration (call_workflow / start_workflow) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add parent-child workflow composition with two primitives: - call_workflow/3: synchronous child execution with wait/resume - start_workflow/3: fire-and-forget child execution Includes cascade cancellation, idempotent resume, parent notification on child completion/failure, and nested workflow support (A→B→C). --- lib/durable.ex | 18 + lib/durable/executor.ex | 208 +++++++++- lib/durable/executor/step_runner.ex | 10 +- lib/durable/orchestration.ex | 257 ++++++++++++ lib/durable/query.ex | 29 ++ test/durable/orchestration_test.exs | 599 ++++++++++++++++++++++++++++ 6 files changed, 1109 insertions(+), 12 deletions(-) create mode 100644 lib/durable/orchestration.ex create mode 100644 test/durable/orchestration_test.exs diff --git a/lib/durable.ex b/lib/durable.ex index 5587d30..9f2b6c5 100644 --- a/lib/durable.ex +++ b/lib/durable.ex @@ -284,6 +284,24 @@ defmodule Durable do Durable.Wait.send_event(workflow_id, event_name, payload) end + @doc """ + Lists child workflow executions for a parent workflow. + + ## Options + + - `:status` - Filter by status + + ## Examples + + children = Durable.list_children(parent_workflow_id) + running_children = Durable.list_children(parent_workflow_id, status: :running) + + """ + @spec list_children(String.t(), keyword()) :: [map()] + def list_children(parent_workflow_id, opts \\ []) do + Durable.Query.list_child_executions(parent_workflow_id, opts) + end + # Scheduling API @doc """ diff --git a/lib/durable/executor.ex b/lib/durable/executor.ex index f7faee8..aac36d0 100644 --- a/lib/durable/executor.ex +++ b/lib/durable/executor.ex @@ -80,6 +80,9 @@ defmodule Durable.Executor do }) |> Repo.update(config) + # Cascade cancel to child workflows + cancel_child_workflows(config, workflow_id) + :ok _execution -> @@ -308,9 +311,18 @@ defmodule Durable.Executor do case StepRunner.execute(step, data, exec.id, config) do {:ok, new_data} -> - # Save data as context and continue to next step with new_data + # Save data as context and continue to next step + # save_data_as_context merges orchestration keys from process dict {:ok, exec} = save_data_as_context(config, exec, new_data) - execute_steps_recursive(remaining_steps, exec, step_index, workflow_def, config, new_data) + # Pass the DB-persisted context forward (includes orchestration keys) + execute_steps_recursive( + remaining_steps, + exec, + step_index, + workflow_def, + config, + exec.context + ) {:decision, target_step, new_data} -> handle_decision_result( @@ -330,6 +342,10 @@ defmodule Durable.Executor do {:ok, exec} = save_data_as_context(config, exec, data) handle_wait_result(config, exec, wait_type, opts) + {:call_workflow, opts} -> + {:ok, exec} = save_data_as_context(config, exec, data) + handle_call_workflow(config, exec, opts) + {:error, error} -> handle_step_failure(exec, error, workflow_def, config) end @@ -376,6 +392,41 @@ defmodule Durable.Executor do defp handle_wait_result(config, exec, :wait_for_all, opts), do: {:waiting, handle_wait_for_all(config, exec, opts) |> elem(1)} + defp handle_wait_result(config, exec, :call_workflow, opts), + do: handle_call_workflow(config, exec, opts) + + # ============================================================================ + # Workflow Orchestration (call_workflow) + # ============================================================================ + + defp handle_call_workflow(config, execution, opts) do + child_id = Keyword.fetch!(opts, :child_id) + event_name = Durable.Orchestration.child_event_name(child_id) + timeout_at = calculate_timeout_at(opts) + + # Create pending event to wait for child completion + attrs = %{ + workflow_id: execution.id, + event_name: event_name, + step_name: execution.current_step, + timeout_at: timeout_at, + timeout_value: serialize_timeout_value(Keyword.get(opts, :timeout_value, :child_timeout)), + wait_type: :single + } + + {:ok, _} = + %PendingEvent{} + |> PendingEvent.changeset(attrs) + |> Repo.insert(config) + + {:ok, execution} = + execution + |> Ecto.Changeset.change(status: :waiting) + |> Repo.update(config) + + {:waiting, execution} + end + defp execute_branch( branch_step, remaining_steps, @@ -522,7 +573,15 @@ defmodule Durable.Executor do case StepRunner.execute(step, data, exec.id, config) do {:ok, new_data} -> {:ok, exec} = save_data_as_context(config, exec, new_data) - execute_branch_steps_sequential(rest, exec, step_index, workflow_def, config, new_data) + + execute_branch_steps_sequential( + rest, + exec, + step_index, + workflow_def, + config, + exec.context + ) {:decision, target_step, new_data} -> # Decisions within branches - save and return for outer handler @@ -549,6 +608,10 @@ defmodule Durable.Executor do {:ok, exec} = save_data_as_context(config, exec, data) {:waiting, handle_wait_for_all(config, exec, opts) |> elem(1)} + {:call_workflow, opts} -> + {:ok, exec} = save_data_as_context(config, exec, data) + handle_call_workflow(config, exec, opts) + {:error, error} -> handle_step_failure(exec, error, workflow_def, config) end @@ -820,6 +883,15 @@ defmodule Durable.Executor do }}} end + defp handle_parallel_step_result({:call_workflow, _opts}, returns_key) do + {:ok, returns_key, + {:error, + %{ + type: "parallel_call_workflow_not_supported", + message: "call_workflow not supported in parallel blocks" + }}} + end + defp await_parallel_tasks(tasks, :fail_fast), do: await_tasks_fail_fast(tasks) defp await_parallel_tasks(tasks, :complete_all), do: await_tasks_complete_all(tasks) defp await_parallel_tasks(tasks, _), do: await_tasks_complete_all(tasks) @@ -961,12 +1033,42 @@ defmodule Durable.Executor do end # Saves data as the workflow context in DB (for persistence/resume) + # Also merges orchestration keys from process dict to ensure child workflow + # references are persisted through DB round-trips defp save_data_as_context(config, execution, data) do + merged = merge_orchestration_context(data) + execution - |> Ecto.Changeset.change(context: data) + |> Ecto.Changeset.change(context: merged) |> Repo.update(config) end + # Merge orchestration keys (__child:*, __fire_forget:*, __child_done:*) from + # process dict into the data to persist. These keys are set by + # Durable.Orchestration.call_workflow/start_workflow via put_context. + defp merge_orchestration_context(data) do + process_ctx = Process.get(:durable_context, %{}) + + orchestration_keys = + process_ctx + |> Enum.filter(fn {key, _} -> orchestration_key?(key) end) + |> Map.new() + + Map.merge(data, orchestration_keys) + end + + defp orchestration_key?(key) when is_atom(key) do + orchestration_key?(Atom.to_string(key)) + end + + defp orchestration_key?(key) when is_binary(key) do + String.starts_with?(key, "__child:") or + String.starts_with?(key, "__fire_forget:") or + String.starts_with?(key, "__child_done:") + end + + defp orchestration_key?(_), do: false + defp mark_completed(config, execution, final_data) do {:ok, execution} = execution @@ -978,21 +1080,105 @@ defmodule Durable.Executor do |> Ecto.Changeset.change(locked_by: nil, locked_at: nil) |> Repo.update(config) + maybe_notify_parent(config, execution, :completed, final_data) + {:ok, execution} end defp mark_failed(config, execution, error) do - execution - |> WorkflowExecution.status_changeset(:failed, %{ - error: error, - completed_at: DateTime.utc_now() - }) - |> Ecto.Changeset.change(locked_by: nil, locked_at: nil) - |> Repo.update(config) + {:ok, execution} = + execution + |> WorkflowExecution.status_changeset(:failed, %{ + error: error, + completed_at: DateTime.utc_now() + }) + |> Ecto.Changeset.change(locked_by: nil, locked_at: nil) + |> Repo.update(config) + + maybe_notify_parent(config, execution, :failed, error) {:error, error} end + # ============================================================================ + # Parent Notification (Orchestration) + # ============================================================================ + + defp maybe_notify_parent(_config, %{parent_workflow_id: nil}, _status, _data), do: :ok + + defp maybe_notify_parent(config, execution, status, data) do + event_name = Durable.Orchestration.child_event_name(execution.id) + payload = Durable.Orchestration.build_result_payload(status, data) + + # Find and fulfill the pending event on the parent workflow + query = + from(p in PendingEvent, + where: + p.workflow_id == ^execution.parent_workflow_id and + p.event_name == ^event_name and + p.status == :pending + ) + + case Repo.one(config, query) do + nil -> + # Parent not waiting (fire-and-forget case, or already timed out) + :ok + + pending_event -> + # Fulfill the pending event + {:ok, _} = + pending_event + |> PendingEvent.receive_changeset(payload) + |> Repo.update(config) + + # Find the child ref from parent's context to store result under the right key + parent = Repo.get(config, WorkflowExecution, execution.parent_workflow_id) + result_context = build_parent_result_context(parent, execution.id, payload) + + # Resume the parent workflow + resume_workflow(execution.parent_workflow_id, result_context) + end + end + + # Build context update for parent with child result stored under the right key + defp build_parent_result_context(parent, child_id, payload) do + parent_context = parent.context || %{} + + # Find which ref this child belongs to by looking for __child:ref = child_id + ref = + Enum.find_value(parent_context, fn + {"__child:" <> ref_str, ^child_id} -> ref_str + _ -> nil + end) + + if ref do + %{ + "__child_done:#{ref}" => payload, + Durable.Orchestration.child_event_name(child_id) => payload + } + else + %{Durable.Orchestration.child_event_name(child_id) => payload} + end + end + + # ============================================================================ + # Cascade Cancellation (Orchestration) + # ============================================================================ + + defp cancel_child_workflows(config, parent_id) do + query = + from(w in WorkflowExecution, + where: w.parent_workflow_id == ^parent_id, + where: w.status in [:pending, :running, :waiting] + ) + + children = Repo.all(config, query) + + Enum.each(children, fn child -> + cancel_workflow(child.id, "parent_cancelled", durable: config.name) + end) + end + # ============================================================================ # Compensation/Saga Support # ============================================================================ diff --git a/lib/durable/executor/step_runner.ex b/lib/durable/executor/step_runner.ex index a0c02ef..0b01a26 100644 --- a/lib/durable/executor/step_runner.ex +++ b/lib/durable/executor/step_runner.ex @@ -26,6 +26,7 @@ defmodule Durable.Executor.StepRunner do | {:wait_for_input, keyword()} | {:wait_for_any, keyword()} | {:wait_for_all, keyword()} + | {:call_workflow, keyword()} @doc """ Executes a step with retry logic. @@ -139,7 +140,14 @@ defmodule Durable.Executor.StepRunner do # Handle wait primitives (throws) defp handle_result({:throw, {wait_type, opts}}, ctx) - when wait_type in [:sleep, :wait_for_event, :wait_for_input, :wait_for_any, :wait_for_all] do + when wait_type in [ + :sleep, + :wait_for_event, + :wait_for_input, + :wait_for_any, + :wait_for_all, + :call_workflow + ] do %{step_exec: step_exec, config: config} = ctx {:ok, _} = update_step_execution(config, step_exec, :waiting) {wait_type, opts} diff --git a/lib/durable/orchestration.ex b/lib/durable/orchestration.ex new file mode 100644 index 0000000..49740e4 --- /dev/null +++ b/lib/durable/orchestration.ex @@ -0,0 +1,257 @@ +defmodule Durable.Orchestration do + @moduledoc """ + Workflow composition: call child workflows from parent steps. + + Provides two primitives for composing workflows: + + - `call_workflow/3` — Start a child workflow and wait for its result (synchronous) + - `start_workflow/3` — Start a child workflow without waiting (fire-and-forget) + + ## Usage + + defmodule MyApp.OrderWorkflow do + use Durable + use Durable.Context + use Durable.Orchestration + + workflow "process_order" do + step :charge_payment, fn data -> + case call_workflow(MyApp.PaymentWorkflow, %{"amount" => data.total}, + timeout: hours(1)) do + {:ok, result} -> + {:ok, assign(data, :payment_id, result["payment_id"])} + {:error, reason} -> + {:error, reason} + end + end + + step :send_email, fn data -> + {:ok, email_wf_id} = start_workflow(MyApp.EmailWorkflow, + %{"to" => data.email}, ref: :confirmation) + {:ok, assign(data, :email_workflow_id, email_wf_id)} + end + end + end + + """ + + alias Durable.Config + alias Durable.Context + alias Durable.Executor + alias Durable.Repo + alias Durable.Storage.Schemas.WorkflowExecution + + @doc """ + Injects orchestration functions into the calling module. + """ + defmacro __using__(_opts) do + quote do + import Durable.Orchestration, + only: [call_workflow: 2, call_workflow: 3, start_workflow: 2, start_workflow: 3] + end + end + + @doc """ + Starts a child workflow and waits for its result. + + The parent workflow will be suspended until the child completes or fails. + On resume, returns `{:ok, result}` or `{:error, reason}`. + + ## Options + + - `:ref` - Reference name for idempotency (default: module name) + - `:timeout` - Timeout in milliseconds + - `:timeout_value` - Value returned on timeout (default: `:child_timeout`) + - `:queue` - Queue for the child workflow (default: "default") + - `:durable` - Durable instance name (default: Durable) + + ## Examples + + case call_workflow(MyApp.PaymentWorkflow, %{"amount" => 100}, timeout: hours(1)) do + {:ok, result} -> {:ok, assign(data, :payment, result)} + {:error, reason} -> {:error, reason} + end + + """ + @spec call_workflow(module(), map(), keyword()) :: {:ok, map()} | {:error, term()} + def call_workflow(module, input, opts \\ []) do + parent_id = Context.workflow_id() + ref = Keyword.get(opts, :ref, module_to_ref(module)) + child_key = child_context_key(ref) + result_key = child_result_key(ref) + + context = Process.get(:durable_context, %{}) + + cond do + # Resume case: result already in context + Map.has_key?(context, result_key) -> + parse_child_result(Map.get(context, result_key)) + + # Resume case: child exists but no result yet + Map.has_key?(context, child_key) -> + child_id = Map.get(context, child_key) + handle_existing_child(child_id, opts) + + # First execution: create child and throw to wait + true -> + create_and_wait(module, input, parent_id, child_key, opts) + end + end + + @doc """ + Starts a child workflow without waiting for its result (fire-and-forget). + + Returns `{:ok, child_id}` immediately. The child runs independently. + Idempotent: if resumed, returns the same child_id without creating a duplicate. + + ## Options + + - `:ref` - Reference name for idempotency (default: module name) + - `:queue` - Queue for the child workflow (default: "default") + - `:durable` - Durable instance name (default: Durable) + + ## Examples + + {:ok, child_id} = start_workflow(MyApp.EmailWorkflow, + %{"to" => email}, ref: :welcome_email) + + """ + @spec start_workflow(module(), map(), keyword()) :: {:ok, String.t()} + def start_workflow(module, input, opts \\ []) do + parent_id = Context.workflow_id() + ref = Keyword.get(opts, :ref, module_to_ref(module)) + fire_key = fire_forget_key(ref) + + context = Process.get(:durable_context, %{}) + + if Map.has_key?(context, fire_key) do + # Idempotent: already created + {:ok, Map.get(context, fire_key)} + else + # Create child and continue (no throw) + {:ok, child_id} = create_child_execution(module, input, parent_id, opts) + Context.put_context(fire_key, child_id) + {:ok, child_id} + end + end + + # ============================================================================ + # Internal helpers + # ============================================================================ + + defp create_and_wait(module, input, parent_id, child_key, opts) do + {:ok, child_id} = create_child_execution(module, input, parent_id, opts) + Context.put_context(child_key, child_id) + + throw( + {:call_workflow, + child_id: child_id, + timeout: Keyword.get(opts, :timeout), + timeout_value: Keyword.get(opts, :timeout_value, :child_timeout)} + ) + end + + defp handle_existing_child(child_id, opts) do + durable_name = Keyword.get(opts, :durable, Durable) + config = Config.get(durable_name) + + case Repo.get(config, WorkflowExecution, child_id) do + nil -> + {:error, :child_not_found} + + %{status: :completed} = child -> + parse_child_result(build_result_payload(:completed, child.context)) + + %{status: status} = child when status in [:failed, :cancelled, :compensation_failed] -> + parse_child_result(build_result_payload(:failed, child.error)) + + _child -> + # Still running/waiting — re-throw to wait again + throw( + {:call_workflow, + child_id: child_id, + timeout: Keyword.get(opts, :timeout), + timeout_value: Keyword.get(opts, :timeout_value, :child_timeout)} + ) + end + end + + defp create_child_execution(module, input, parent_id, opts) do + durable_name = Keyword.get(opts, :durable, Durable) + config = Config.get(durable_name) + + {:ok, workflow_def} = get_child_workflow_def(module, opts) + + attrs = %{ + workflow_module: Atom.to_string(module), + workflow_name: workflow_def.name, + status: :pending, + queue: Keyword.get(opts, :queue, "default") |> to_string(), + priority: Keyword.get(opts, :priority, 0), + input: input, + context: %{}, + parent_workflow_id: parent_id + } + + {:ok, execution} = + %WorkflowExecution{} + |> WorkflowExecution.changeset(attrs) + |> Repo.insert(config) + + # For inline execution (testing), execute the child immediately + if Keyword.get(opts, :inline, false) do + Executor.execute_workflow(execution.id, config) + end + + {:ok, execution.id} + end + + defp get_child_workflow_def(module, opts) do + case Keyword.get(opts, :workflow) do + nil -> module.__default_workflow__() + name -> module.__workflow_definition__(name) + end + end + + @doc false + def child_context_key(ref), do: :"__child:#{ref}" + + @doc false + def child_result_key(ref), do: :"__child_done:#{ref}" + + @doc false + def child_event_name(child_id), do: "__child_done:#{child_id}" + + @doc false + def fire_forget_key(ref), do: :"__fire_forget:#{ref}" + + @doc false + def build_result_payload(status, data) do + %{ + "status" => Atom.to_string(status), + "result" => data + } + end + + @doc false + def parse_child_result(%{"status" => "completed", "result" => result}) do + {:ok, result} + end + + def parse_child_result(%{"status" => status, "result" => result}) + when status in ["failed", "cancelled", "compensation_failed"] do + {:error, result} + end + + def parse_child_result(other) do + {:error, {:unexpected_child_result, other}} + end + + defp module_to_ref(module) do + module + |> Module.split() + |> List.last() + |> Macro.underscore() + |> String.to_atom() + end +end diff --git a/lib/durable/query.ex b/lib/durable/query.ex index 552dc90..412dcf9 100644 --- a/lib/durable/query.ex +++ b/lib/durable/query.ex @@ -129,6 +129,35 @@ defmodule Durable.Query do end end + @doc """ + Lists child workflow executions for a parent workflow. + + ## Options + + - `:status` - Filter by status + - `:durable` - The Durable instance name (default: Durable) + + """ + @spec list_child_executions(String.t(), keyword()) :: [map()] + def list_child_executions(parent_workflow_id, opts \\ []) do + config = get_config(opts) + + query = + from(w in WorkflowExecution, + where: w.parent_workflow_id == ^parent_workflow_id, + order_by: [asc: w.inserted_at] + ) + + query = + case Keyword.get(opts, :status) do + nil -> query + status -> from(w in query, where: w.status == ^status) + end + + Repo.all(config, query) + |> Enum.map(&execution_to_map(&1, false, false)) + end + # Private functions defp get_config(opts) do diff --git a/test/durable/orchestration_test.exs b/test/durable/orchestration_test.exs new file mode 100644 index 0000000..f7a68fc --- /dev/null +++ b/test/durable/orchestration_test.exs @@ -0,0 +1,599 @@ +defmodule Durable.OrchestrationTest do + @moduledoc """ + Tests for workflow orchestration (call_workflow/start_workflow). + + Tests cover: + - call_workflow: sync child, child completes/fails, timeout, resume, idempotency + - start_workflow: fire-and-forget, idempotency, independent completion + - Cascade cancellation + - Nesting (A calls B calls C) + - Integration with branches + """ + use Durable.DataCase, async: false + + alias Durable.Config + alias Durable.Executor + alias Durable.Storage.Schemas.{PendingEvent, WorkflowExecution} + + import Ecto.Query + + # ============================================================================ + # call_workflow Tests + # ============================================================================ + + describe "call_workflow/3" do + test "parent calls child, child completes, parent gets result" do + config = Config.get(Durable) + repo = config.repo + + # Start parent — it will create child and go to :waiting + {:ok, parent} = create_and_execute_workflow(CallWorkflowParent, %{}) + + assert parent.status == :waiting + assert parent.context["before_call"] == true + + # Child should exist with parent_workflow_id set + child_id = parent.context["__child:simple_child_workflow"] + assert child_id != nil + + child = repo.get!(WorkflowExecution, child_id) + assert child.parent_workflow_id == parent.id + assert child.status == :pending + + # A pending event should exist for child completion + event_name = "__child_done:#{child_id}" + pending = get_pending_event(repo, parent.id, event_name) + assert pending != nil + assert pending.status == :pending + + # Execute the child workflow + Executor.execute_workflow(child_id, config) + + child = repo.get!(WorkflowExecution, child_id) + assert child.status == :completed + assert child.context["child_result"] == "done" + + # Parent should have been resumed — execute it + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :pending + + Executor.execute_workflow(parent.id, config) + + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :completed + assert parent.context["call_result"] != nil + assert parent.context["after_call"] == true + end + + test "parent calls child, child fails, parent gets error" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(CallWorkflowFailingParent, %{}) + + assert parent.status == :waiting + + child_id = parent.context["__child:failing_child_workflow"] + assert child_id != nil + + # Execute the failing child + Executor.execute_workflow(child_id, config) + + child = repo.get!(WorkflowExecution, child_id) + assert child.status == :failed + + # Parent should be resumed + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :pending + + Executor.execute_workflow(parent.id, config) + + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :completed + assert parent.context["got_error"] == true + end + + test "idempotency: resumed parent does not create duplicate child" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(CallWorkflowParent, %{}) + + assert parent.status == :waiting + child_id = parent.context["__child:simple_child_workflow"] + + # Count children before + children_before = + repo.all(from(w in WorkflowExecution, where: w.parent_workflow_id == ^parent.id)) + + assert length(children_before) == 1 + + # Execute child to complete it + Executor.execute_workflow(child_id, config) + + # Resume parent — it should find the result, not create another child + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :pending + + Executor.execute_workflow(parent.id, config) + + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :completed + + # Still only 1 child + children_after = + repo.all(from(w in WorkflowExecution, where: w.parent_workflow_id == ^parent.id)) + + assert length(children_after) == 1 + end + + test "resume: child still running causes parent to re-wait" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(CallWorkflowParent, %{}) + + assert parent.status == :waiting + child_id = parent.context["__child:simple_child_workflow"] + + # Manually set parent back to pending to simulate crash-resume + # without child completing + parent_exec = repo.get!(WorkflowExecution, parent.id) + + parent_exec + |> Ecto.Changeset.change(status: :pending) + |> repo.update!() + + # Delete the pending event so it won't block + repo.delete_all(from(p in PendingEvent, where: p.workflow_id == ^parent.id)) + + # Re-execute parent — child is still pending, should re-wait + Executor.execute_workflow(parent.id, config) + + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :waiting + + # Now execute child + Executor.execute_workflow(child_id, config) + + # Resume parent + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :pending + + Executor.execute_workflow(parent.id, config) + + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :completed + end + end + + # ============================================================================ + # start_workflow Tests + # ============================================================================ + + describe "start_workflow/3" do + test "fire-and-forget: parent continues, child created with parent_workflow_id" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(FireForgetParent, %{}) + + # Parent should complete (no waiting) + assert parent.status == :completed + assert parent.context["before_fire"] == true + assert parent.context["after_fire"] == true + + child_id = parent.context["__fire_forget:confirmation_email"] + assert child_id != nil + + child = repo.get!(WorkflowExecution, child_id) + assert child.parent_workflow_id == parent.id + assert child.status == :pending + + # Execute child independently + Executor.execute_workflow(child_id, config) + + child = repo.get!(WorkflowExecution, child_id) + assert child.status == :completed + end + + test "idempotency: same ref returns same child_id on resume" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(FireForgetIdempotentParent, %{}) + + assert parent.status == :completed + + # Both start_workflow calls in the step used the same ref + # Only one child should exist + children = + repo.all(from(w in WorkflowExecution, where: w.parent_workflow_id == ^parent.id)) + + # The step calls start_workflow twice with different refs + assert length(children) == 2 + assert parent.context["child1_id"] != parent.context["child2_id"] + end + + test "multiple fire-and-forget children with distinct refs" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(FireForgetIdempotentParent, %{}) + + assert parent.status == :completed + + children = + repo.all( + from(w in WorkflowExecution, + where: w.parent_workflow_id == ^parent.id, + order_by: [asc: :inserted_at] + ) + ) + + assert length(children) == 2 + + Enum.each(children, fn child -> + assert child.parent_workflow_id == parent.id + + # Execute each child + Executor.execute_workflow(child.id, config) + child = repo.get!(WorkflowExecution, child.id) + assert child.status == :completed + end) + end + end + + # ============================================================================ + # Cascade Cancellation Tests + # ============================================================================ + + describe "cascade cancellation" do + test "cancel parent cancels pending children" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(CallWorkflowParent, %{}) + + assert parent.status == :waiting + child_id = parent.context["__child:simple_child_workflow"] + + child = repo.get!(WorkflowExecution, child_id) + assert child.status == :pending + + # Cancel parent + :ok = Executor.cancel_workflow(parent.id, "test_cancel") + + # Child should be cancelled too + child = repo.get!(WorkflowExecution, child_id) + assert child.status == :cancelled + assert child.error["message"] == "parent_cancelled" + end + + test "cancel parent does not affect completed children" do + config = Config.get(Durable) + repo = config.repo + + # Use fire-and-forget so parent completes + {:ok, parent} = create_and_execute_workflow(FireForgetParent, %{}) + + child_id = parent.context["__fire_forget:confirmation_email"] + + # Complete the child first + Executor.execute_workflow(child_id, config) + + child = repo.get!(WorkflowExecution, child_id) + assert child.status == :completed + + # Now create another workflow that we can cancel + # (The fire-forget parent already completed, so let's test with call_workflow) + {:ok, parent2} = create_and_execute_workflow(CallWorkflowParent, %{}) + + child2_id = parent2.context["__child:simple_child_workflow"] + + # Complete child2 + Executor.execute_workflow(child2_id, config) + + child2 = repo.get!(WorkflowExecution, child2_id) + assert child2.status == :completed + + # Cancel parent (which is now pending after child resumed it) + parent2 = repo.get!(WorkflowExecution, parent2.id) + assert parent2.status == :pending + + :ok = Executor.cancel_workflow(parent2.id, "test_cancel") + + # Already-completed child should remain completed + child2 = repo.get!(WorkflowExecution, child2_id) + assert child2.status == :completed + end + end + + # ============================================================================ + # Nesting Tests + # ============================================================================ + + describe "nesting" do + test "A calls B calls C — full chain completes" do + config = Config.get(Durable) + repo = config.repo + + # Start A (grandparent) + {:ok, a} = create_and_execute_workflow(GrandparentWorkflow, %{}) + + assert a.status == :waiting + + # B (child of A) should exist + b_id = a.context["__child:parent_workflow"] + assert b_id != nil + + # Execute B — it will call C and wait + Executor.execute_workflow(b_id, config) + + b = repo.get!(WorkflowExecution, b_id) + assert b.status == :waiting + + # C (child of B) should exist + c_id = b.context["__child:simple_child_workflow"] + assert c_id != nil + + c = repo.get!(WorkflowExecution, c_id) + assert c.parent_workflow_id == b_id + + # Execute C + Executor.execute_workflow(c_id, config) + + c = repo.get!(WorkflowExecution, c_id) + assert c.status == :completed + + # B should be resumed + b = repo.get!(WorkflowExecution, b_id) + assert b.status == :pending + + Executor.execute_workflow(b_id, config) + + b = repo.get!(WorkflowExecution, b_id) + assert b.status == :completed + + # A should be resumed + a = repo.get!(WorkflowExecution, a.id) + assert a.status == :pending + + Executor.execute_workflow(a.id, config) + + a = repo.get!(WorkflowExecution, a.id) + assert a.status == :completed + assert a.context["chain_complete"] == true + end + end + + # ============================================================================ + # Integration Tests + # ============================================================================ + + describe "integration" do + test "start_workflow inside step works (no blocking)" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(FireForgetParent, %{}) + + assert parent.status == :completed + + # Child exists and can run independently + child_id = parent.context["__fire_forget:confirmation_email"] + assert child_id != nil + + Executor.execute_workflow(child_id, config) + + child = repo.get!(WorkflowExecution, child_id) + assert child.status == :completed + end + + test "list_children returns child workflows" do + config = Config.get(Durable) + + {:ok, parent} = create_and_execute_workflow(FireForgetIdempotentParent, %{}) + + children = Durable.list_children(parent.id) + assert length(children) == 2 + + Enum.each(children, fn child -> + Executor.execute_workflow(child.id, config) + end) + + completed_children = Durable.list_children(parent.id, status: :completed) + assert length(completed_children) == 2 + + pending_children = Durable.list_children(parent.id, status: :pending) + assert pending_children == [] + end + end + + # ============================================================================ + # Helpers + # ============================================================================ + + defp create_and_execute_workflow(module, input) do + config = Config.get(Durable) + repo = config.repo + {:ok, workflow_def} = module.__default_workflow__() + + attrs = %{ + workflow_module: Atom.to_string(module), + workflow_name: workflow_def.name, + status: :pending, + queue: "default", + priority: 0, + input: input, + context: %{} + } + + {:ok, execution} = + %WorkflowExecution{} + |> WorkflowExecution.changeset(attrs) + |> repo.insert() + + Executor.execute_workflow(execution.id, config) + {:ok, repo.get!(WorkflowExecution, execution.id)} + end + + defp get_pending_event(repo, workflow_id, event_name) do + repo.one( + from(p in PendingEvent, + where: p.workflow_id == ^workflow_id and p.event_name == ^event_name + ) + ) + end +end + +# ============================================================================ +# Test Workflow Modules +# ============================================================================ + +defmodule SimpleChildWorkflow do + use Durable + use Durable.Helpers + + workflow "simple_child" do + step(:do_work, fn data -> + {:ok, assign(data, :child_result, "done")} + end) + end +end + +defmodule FailingChildWorkflow do + use Durable + use Durable.Helpers + + workflow "failing_child" do + step(:fail_step, fn _data -> + {:error, %{type: "child_failure", message: "child failed on purpose"}} + end) + end +end + +defmodule CallWorkflowParent do + use Durable + use Durable.Helpers + use Durable.Orchestration + + workflow "call_parent" do + step(:before, fn data -> + {:ok, assign(data, :before_call, true)} + end) + + step(:call_child, fn data -> + case call_workflow(SimpleChildWorkflow, %{}) do + {:ok, result} -> + {:ok, assign(data, :call_result, result)} + + {:error, reason} -> + {:ok, assign(data, :call_error, reason)} + end + end) + + step(:after, fn data -> + {:ok, assign(data, :after_call, true)} + end) + end +end + +defmodule CallWorkflowFailingParent do + use Durable + use Durable.Helpers + use Durable.Orchestration + + workflow "call_failing_parent" do + step(:call_failing_child, fn data -> + case call_workflow(FailingChildWorkflow, %{}) do + {:ok, result} -> + {:ok, assign(data, :call_result, result)} + + {:error, _reason} -> + {:ok, assign(data, :got_error, true)} + end + end) + end +end + +defmodule FireForgetParent do + use Durable + use Durable.Helpers + use Durable.Orchestration + + workflow "fire_forget_parent" do + step(:before, fn data -> + {:ok, assign(data, :before_fire, true)} + end) + + step(:fire_child, fn data -> + {:ok, child_id} = + start_workflow(SimpleChildWorkflow, %{}, ref: :confirmation_email) + + {:ok, assign(data, :child_id, child_id)} + end) + + step(:after, fn data -> + {:ok, assign(data, :after_fire, true)} + end) + end +end + +defmodule FireForgetIdempotentParent do + use Durable + use Durable.Helpers + use Durable.Orchestration + + workflow "fire_forget_idempotent" do + step(:fire_two_children, fn data -> + {:ok, child1_id} = + start_workflow(SimpleChildWorkflow, %{}, ref: :email1) + + {:ok, child2_id} = + start_workflow(SimpleChildWorkflow, %{}, ref: :email2) + + data + |> assign(:child1_id, child1_id) + |> assign(:child2_id, child2_id) + |> then(&{:ok, &1}) + end) + end +end + +# Nested: A -> B -> C +defmodule ParentWorkflow do + use Durable + use Durable.Helpers + use Durable.Orchestration + + workflow "parent_wf" do + step(:call_simple, fn data -> + case call_workflow(SimpleChildWorkflow, %{}) do + {:ok, result} -> + {:ok, assign(data, :child_result, result)} + + {:error, reason} -> + {:ok, assign(data, :child_error, reason)} + end + end) + end +end + +defmodule GrandparentWorkflow do + use Durable + use Durable.Helpers + use Durable.Orchestration + + workflow "grandparent_wf" do + step(:call_parent, fn data -> + case call_workflow(ParentWorkflow, %{}) do + {:ok, result} -> + {:ok, data |> assign(:nested_result, result) |> assign(:chain_complete, true)} + + {:error, reason} -> + {:ok, assign(data, :nested_error, reason)} + end + end) + end +end From 83bf0a799d063d767a6f25e304763ae4671fd0ba Mon Sep 17 00:00:00 2001 From: Kasun Vithanage Date: Fri, 27 Feb 2026 01:24:41 +0530 Subject: [PATCH 2/2] docs: add orchestration guide and update README/WORKPLAN Add guides/orchestration.md with examples for call_workflow and start_workflow. Update README with orchestration in features list, API reference, and guides section. Update WORKPLAN to mark orchestration complete (~268 tests). --- README.md | 19 +- ...LAN.md => IMPLEMENTATION_PLAN_ARCHIVED.md} | 8 + agents/WORKPLAN.md | 108 ++++-- agents/arch.md | 111 +++--- agents/context-index.md | 40 +- guides/orchestration.md | 341 ++++++++++++++++++ 6 files changed, 540 insertions(+), 87 deletions(-) rename agents/{IMPLEMENTATION_PLAN.md => IMPLEMENTATION_PLAN_ARCHIVED.md} (99%) create mode 100644 guides/orchestration.md diff --git a/README.md b/README.md index 2ab70ca..0d533ba 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ A durable, resumable workflow engine for Elixir. Similar to Temporal/Inngest. - **Compensations** - Saga pattern with automatic rollback - **Cron Scheduling** - Recurring workflows with cron expressions - **Reliability** - Automatic retries with exponential/linear/constant backoff +- **Orchestration** - Parent/child workflow composition - **Persistence** - PostgreSQL-backed execution state ## Installation @@ -439,6 +440,21 @@ hours(2) # 7_200_000 ms days(7) # 604_800_000 ms ``` +### Orchestration + +```elixir +use Durable.Orchestration + +# Synchronous: call child and wait for result +case call_workflow(MyApp.PaymentWorkflow, %{"amount" => 100}, timeout: hours(1)) do + {:ok, result} -> {:ok, assign(data, :payment, result)} + {:error, reason} -> {:error, reason} +end + +# Fire-and-forget: start child and continue +{:ok, child_id} = start_workflow(MyApp.EmailWorkflow, %{"to" => email}, ref: :welcome) +``` + ### API ```elixir @@ -449,6 +465,7 @@ Durable.list_executions(workflow: Module, status: :running) Durable.cancel(id, "reason") Durable.send_event(id, "event", payload) Durable.provide_input(id, "input_name", data) +Durable.list_children(parent_id) ``` ## Guides @@ -457,10 +474,10 @@ Durable.provide_input(id, "input_name", data) - [Parallel](guides/parallel.md) - Concurrent execution - [Compensations](guides/compensations.md) - Saga pattern - [Waiting](guides/waiting.md) - Sleep, events, human input +- [Orchestration](guides/orchestration.md) - Parent/child workflow composition ## Coming Soon -- Workflow orchestration (parent/child workflows) - Phoenix LiveView dashboard ## License diff --git a/agents/IMPLEMENTATION_PLAN.md b/agents/IMPLEMENTATION_PLAN_ARCHIVED.md similarity index 99% rename from agents/IMPLEMENTATION_PLAN.md rename to agents/IMPLEMENTATION_PLAN_ARCHIVED.md index 4c39e87..0adde58 100644 --- a/agents/IMPLEMENTATION_PLAN.md +++ b/agents/IMPLEMENTATION_PLAN_ARCHIVED.md @@ -1,5 +1,13 @@ # DurableWorkflow Implementation Plan +> **⚠️ ARCHIVED (2026-01-23):** This document is no longer maintained. +> See `WORKPLAN.md` for current status and `arch.md` for technical reference. +> +> Key changes since this document was written: +> - ForEach primitive was **removed** (use `Enum.map` instead) +> - Parallel execution uses new results model (`__results__`, `into:`, `returns:`) +> - Loop primitive was **never implemented** (use step retries or `Enum` functions) + ## Executive Summary This document outlines the complete implementation plan for **Durable**, a durable, resumable workflow engine for Elixir. diff --git a/agents/WORKPLAN.md b/agents/WORKPLAN.md index dfa85bf..dc89d8b 100644 --- a/agents/WORKPLAN.md +++ b/agents/WORKPLAN.md @@ -1,8 +1,8 @@ # Durable Workflow Engine - Work Plan -**Last Updated:** 2026-01-03 +**Last Updated:** 2026-02-27 **Overall Progress:** ~75% Complete -**Reference:** See `IMPLEMENTATION_PLAN.md` for detailed design and code examples +**Reference:** See `arch.md` for technical architecture --- @@ -10,8 +10,8 @@ | Metric | Value | |--------|-------| -| Source Modules | 41 | -| Passing Tests | 214 | +| Source Modules | 40 | +| Passing Tests | ~268 | | Documentation Guides | 6 | | Lines of Code | ~8,500 | @@ -24,7 +24,7 @@ | 0 | Project Foundation | Complete | 100% | | 1 | Core MVP | Complete | 100% | | 2 | Observability | Partial | 40% | -| 3 | Advanced Features | Mostly Complete | 85% | +| 3 | Advanced Features | Mostly Complete | 90% | | 4 | Scalability | Not Started | 0% | | 5 | Developer Experience | Partial | 35% | @@ -86,7 +86,7 @@ --- -## Phase 3: Advanced Features [85%] +## Phase 3: Advanced Features [90%] ### 3.1-3.3 Wait Primitives [COMPLETE - 46 tests] @@ -117,26 +117,27 @@ ### 3.5 Loops [SKIPPED] -Intentionally skipped - use step-level retries or `foreach` instead. +Intentionally skipped - use step-level retries or Elixir's `Enum` functions instead. ### 3.6 Parallel Execution [COMPLETE - 13 tests] | Feature | Status | |---------|--------| | `parallel do` macro | Complete | -| Context merge strategies | Complete | +| Results model (`__results__`) | Complete | +| `into:` custom merge function | Complete | +| `returns:` option | Complete | | Error strategies | Complete | | Resume durability | Complete | -### 3.7 ForEach [COMPLETE - 13 tests] +See `guides/parallel.md` for comprehensive documentation. -| Feature | Status | -|---------|--------| -| `foreach` macro | Complete | -| `current_item/0`, `current_index/0` | Complete | -| Sequential/Concurrent modes | Complete | -| `:collect_as` option | Complete | -| `:on_error` handling | Complete | +### 3.7 ForEach [REMOVED] + +**Decision (2026-01-23):** The `foreach` primitive was removed. Users should use +Elixir's built-in enumeration tools (`Enum.map`, `Task.async_stream`) for batch +processing instead. This simplifies the DSL while providing the same functionality +through idiomatic Elixir. ### 3.8 Switch/Case [NOT STARTED] @@ -165,12 +166,24 @@ Low priority - `branch` macro covers most cases. | Manual trigger | Complete | | Telemetry events | Complete | +### 3.11 Workflow Orchestration [COMPLETE - 12 tests] + +| Feature | Status | +|---------|--------| +| `call_workflow/3` (synchronous) | Complete | +| `start_workflow/3` (fire-and-forget) | Complete | +| Idempotent resume | Complete | +| Cascade cancellation | Complete | +| Parent notification on child complete/fail | Complete | +| Nested workflows (A → B → C) | Complete | +| `Durable.list_children/2` API | Complete | + +See `guides/orchestration.md` for comprehensive documentation. + ### Remaining Phase 3 Work | Feature | Priority | Complexity | |---------|----------|------------| -| Workflow Orchestration (`call_workflow`) | High | Medium | -| Parent-child tracking | High | Low | | Switch/Case macro | Low | Low | | Pipe-based API | Low | Medium | @@ -204,7 +217,7 @@ Note: Multi-node scheduling already works via `FOR UPDATE SKIP LOCKED`. | Module docs (@moduledoc) | Complete | | Function docs (@doc) | Complete | | Typespecs (@spec) | Complete | -| 6 Documentation Guides | Complete | +| 7 Documentation Guides | Complete | ### Remaining @@ -229,25 +242,24 @@ Note: Multi-node scheduling already works via `FOR UPDATE SKIP LOCKED`. ### High Priority 1. Guide: Getting Started -2. Workflow Orchestration (`call_workflow`) -3. HexDocs Publishing -4. `mix durable.status` +2. HexDocs Publishing +3. `mix durable.status` ### Medium Priority -5. Guide: Testing Workflows -6. `Durable.TestCase` -7. Graph Generation -8. `mix durable.list` -9. pg_notify Message Bus +4. Guide: Testing Workflows +5. `Durable.TestCase` +6. Graph Generation +7. `mix durable.list` +8. pg_notify Message Bus ### Lower Priority -10. Switch/Case macro -11. Redis Queue Adapter -12. Phoenix Dashboard -13. Example Project -14. Pipe-based API +9. Switch/Case macro +10. Redis Queue Adapter +11. Phoenix Dashboard +12. Example Project +13. Pipe-based API --- @@ -259,30 +271,50 @@ Note: Multi-node scheduling already works via `FOR UPDATE SKIP LOCKED`. | wait_test.exs | 46 | Wait primitives | | decision_test.exs | 13 | Decision steps | | parallel_test.exs | 13 | Parallel execution | -| foreach_test.exs | 13 | ForEach iteration | | log_capture_test.exs | 13 | Log/IO capture | | integration_test.exs | 11 | End-to-end flows | | branch_test.exs | 10 | Branch macro | | durable_test.exs | 8 | Core API | | compensation_test.exs | 6 | Saga pattern | | Other | ~36 | Queue, handlers, etc. | -| **Total** | **214** | | +| orchestration_test.exs | 12 | Workflow orchestration | +| **Total** | **~268** | | --- ## Known Limitations -1. Wait primitives not supported in parallel/foreach blocks +1. Wait primitives not supported in parallel blocks 2. No backward jumps in decision steps (forward-only by design) 3. Context is single-level atomized (top-level keys only) 4. No workflow versioning +5. No foreach/loop DSL primitives (use Elixir's `Enum` functions) --- ## Next Steps 1. **Documentation** - Getting Started guide and HexDocs publishing -2. **Workflow Orchestration** - Child workflow support (`call_workflow`) -3. **Graph Visualization** - Understanding complex workflows +2. **Graph Visualization** - Understanding complex workflows +3. **Testing Helpers** - `Durable.TestCase` for easier workflow testing + +The existing ~268 tests provide good confidence in implemented features. Suitable for internal use; additional documentation needed before public release. + +--- -The existing 214 tests provide good confidence in implemented features. Suitable for internal use; additional documentation needed before public release. +## Changelog + +### 2026-02-27 +- Added workflow orchestration: `call_workflow/3` (synchronous) and `start_workflow/3` (fire-and-forget) +- Added `Durable.Orchestration` module with `use Durable.Orchestration` macro +- Added cascade cancellation (cancelling parent cancels active children) +- Added parent notification on child completion/failure +- Added `Durable.list_children/2` API +- Added `guides/orchestration.md` documentation +- 12 new tests for orchestration (total: ~268) + +### 2026-01-23 +- Removed `foreach` primitive (use `Enum.map` or `Task.async_stream` instead) +- Updated parallel execution with new results model (`__results__`, `into:`, `returns:`) +- Updated documentation in `guides/parallel.md` +- Archived `IMPLEMENTATION_PLAN.md` (now `IMPLEMENTATION_PLAN_ARCHIVED.md`) diff --git a/agents/arch.md b/agents/arch.md index fcb9301..00d70fb 100644 --- a/agents/arch.md +++ b/agents/arch.md @@ -183,59 +183,67 @@ end Note: The `branch` macro is preferred for new workflows as it's more readable and allows multiple steps per branch. -### Loops - -```elixir -loop :retry_until_success, - while: fn ctx -> !ctx.success && ctx.current_retry < ctx.max_retries end do - - step :attempt_api_call do - case ExternalAPI.call() do - {:ok, result} -> put_context(:success, true) - {:error, _} -> update_context(:current_retry, & &1 + 1) - end - end - - step :backoff do - unless context().success do - delay = :math.pow(2, context().current_retry) |> round() - sleep_for(seconds: delay) - end - end -end -``` - ### Parallel Execution +Execute multiple steps concurrently. Results are stored in a structured format. + ```elixir parallel do step :send_welcome_email do - EmailService.send_welcome(state().user.email) + EmailService.send_welcome(get_context(:user).email) end - + step :provision_workspace do - WorkspaceService.create(state().user.id) + WorkspaceService.create(get_context(:user_id)) end - + step :create_stripe_customer do - StripeService.create_customer(state().user) + StripeService.create_customer(get_context(:user)) + end +end + +# Results automatically stored as: +# context.__results__ = %{ +# send_welcome_email: {:ok, email_result}, +# provision_workspace: {:ok, workspace_result}, +# create_stripe_customer: {:ok, stripe_result} +# } + +# Access results in subsequent steps: +step :finalize do + results = parallel_results() # Get all results + email = parallel_result(:send_welcome_email) # Get specific result + + if parallel_ok?(:provision_workspace) do + # All good end end ``` -### ForEach +**Custom result handling with `into:`:** ```elixir -foreach :process_items, items: fn -> context().items end do |item| - step :process_item do - result = ItemProcessor.process(item) - append_context(:results, result) - end +parallel into: fn ctx, results -> + # Custom merge function receives context and results map + successful = Enum.filter(results, fn {_k, v} -> match?({:ok, _}, v) end) + put_context(:successful_count, length(successful)) +end do + step :task_a do ... end + step :task_b do ... end end ``` +**Options:** +- `into:` - Custom function to merge results into context +- `returns:` - Specify which step's result to return (`:first_completed`, `:all`, or step name) +- Error handling: Failures are captured in results, workflow continues + +See `guides/parallel.md` for comprehensive documentation. + ### Workflow Orchestration +> **Note:** Not yet implemented. This is a planned feature. + Call child workflows from parent steps to compose larger workflows: ```elixir @@ -267,27 +275,30 @@ workflow "order_pipeline" do end ``` -**Options:** +**Planned Options:** - `call_workflow/3` - Start child and wait for result - `start_workflow/3` - Fire-and-forget - Parent-child relationships tracked via `parent_workflow_id` ### Switch/Case +> **Note:** Not yet implemented. Use the `branch` macro instead for conditional execution. + ```elixir +# PLANNED - NOT IMPLEMENTED switch :route_by_category, on: fn -> context().category end do case_match "billing" do step :assign_to_billing do TicketService.assign(input().ticket, team: :billing) end end - + case_match "technical" do step :assign_to_engineering do TicketService.assign(input().ticket, team: :engineering) end end - + default do step :assign_to_general_support do TicketService.assign(input().ticket, team: :general) @@ -296,6 +307,15 @@ switch :route_by_category, on: fn -> context().category end do end ``` +**Current Alternative:** Use the `branch` macro with pattern matching: +```elixir +branch on: get_context(:category) do + "billing" -> step :assign_billing do ... end + "technical" -> step :assign_engineering do ... end + _ -> step :assign_general do ... end +end +``` + --- ## Context Management @@ -339,6 +359,11 @@ current_step() init_accumulator(:events, []) append_context(:events, new_event) increment_context(:counter, 1) + +# Parallel Results (after parallel block) +parallel_results() # Get all parallel results as map +parallel_result(:step_name) # Get specific step result +parallel_ok?(:step_name) # Check if step succeeded ``` --- @@ -1209,13 +1234,15 @@ Benefits: ### Phase 3: Advanced Features - [x] Wait primitives (sleep, wait_for_event, wait_for_input) - [x] Decision steps (legacy `decision` + `{:goto}`) -- [x] Branch macro (new intuitive conditional flow) -- [ ] Loops and iterations -- [ ] Parallel execution +- [x] Branch macro (intuitive conditional flow) +- [x] Parallel execution (with results model) +- [x] Compensation/saga +- [x] Cron scheduling +- [~] ForEach - **REMOVED** (use `Enum.map` instead) +- [~] Loops - **Skipped** (use step retries or `Enum` functions) - [ ] Workflow orchestration (call child workflows) +- [ ] Switch/case macro - [ ] Pipe-based API (functional workflow composition) -- [ ] Compensation/saga -- [ ] Cron scheduling ### Phase 4: Scalability - [ ] Redis queue adapter @@ -1224,6 +1251,8 @@ Benefits: - [ ] Horizontal scaling support ### Phase 5: Developer Experience +- [x] Module documentation (@moduledoc, @doc) +- [x] 5 documentation guides - [ ] CLI tools - [ ] Mix tasks - [ ] Testing helpers diff --git a/agents/context-index.md b/agents/context-index.md index 7f51837..2cc54e5 100644 --- a/agents/context-index.md +++ b/agents/context-index.md @@ -8,6 +8,31 @@ This index provides quick access to archived development discussions and impleme |-------|--------------|----------|--------| | [Wait Primitives Complete](./conversations/wait-primitives-complete/) | 2026-01-03 | 1 | Completed | +--- + +## 2026-01-23: ForEach Removal & Parallel Refactor + +**Key Changes:** +- Removed `foreach` primitive entirely +- Simplified parallel execution with new results model (`__results__`, `into:`, `returns:`) +- Updated `guides/parallel.md` with comprehensive documentation +- Consolidated planning docs: archived `IMPLEMENTATION_PLAN.md` + +**Files Changed:** +- `lib/durable/dsl/step.ex` - Removed foreach macro +- `lib/durable/executor.ex` - Removed foreach execution +- `test/durable/foreach_test.exs` - Deleted +- `guides/foreach.md` - Deleted +- `agents/WORKPLAN.md` - Updated status, test counts, added changelog +- `agents/arch.md` - Removed outdated sections, updated parallel docs +- `agents/IMPLEMENTATION_PLAN.md` → `agents/IMPLEMENTATION_PLAN_ARCHIVED.md` + +**Decision Rationale:** Users should use `Enum.map` or `Task.async_stream` for batch +processing. This simplifies the DSL while providing equivalent functionality through +idiomatic Elixir patterns. + +--- + ## Completed Topics | Topic | Completed | Description | @@ -91,15 +116,16 @@ Covers fixing CI failures after the parallel jobs feature. Key outcomes: ``` agents/ -├── conversations/ # Archived discussion topics +├── conversations/ # Archived discussion topics │ └── {topic-slug}/ -│ ├── README.md # Topic overview -│ ├── sessions/ # Individual session records +│ ├── README.md # Topic overview +│ ├── sessions/ # Individual session records │ └── implementation-plan.md -├── context-index.md # This file -├── .archived-topics.json # Machine-readable metadata -├── arch.md # Architecture notes -└── WORKPLAN.md # Work planning +├── context-index.md # This file +├── .archived-topics.json # Machine-readable metadata +├── arch.md # Architecture & technical reference +├── WORKPLAN.md # Current status & work planning +└── IMPLEMENTATION_PLAN_ARCHIVED.md # Historical (no longer maintained) ``` --- diff --git a/guides/orchestration.md b/guides/orchestration.md new file mode 100644 index 0000000..2a5b3e8 --- /dev/null +++ b/guides/orchestration.md @@ -0,0 +1,341 @@ +# Workflow Orchestration + +Compose workflows by calling child workflows from parent steps. + +## Setup + +```elixir +defmodule MyApp.MyWorkflow do + use Durable + use Durable.Helpers + use Durable.Context + use Durable.Orchestration # Import orchestration functions +end +``` + +## `call_workflow/3` — Synchronous Child + +Start a child workflow and wait for its result. The parent suspends until the child completes or fails. + +```elixir +workflow "order_pipeline" do + step :charge, fn data -> + case call_workflow(MyApp.PaymentWorkflow, %{"amount" => data.total}, + timeout: hours(1)) do + {:ok, result} -> + {:ok, assign(data, :payment_id, result["payment_id"])} + {:error, reason} -> + {:error, "Payment failed: #{inspect(reason)}"} + end + end +end +``` + +**Options:** + +| Option | Description | Default | +|--------|-------------|---------| +| `:ref` | Reference name for idempotency | Module name | +| `:timeout` | Max wait time in ms | None (wait forever) | +| `:timeout_value` | Value returned on timeout | `:child_timeout` | +| `:queue` | Queue for child workflow | `"default"` | + +**Return values:** + +| Child Status | Return | +|-------------|--------| +| Completed | `{:ok, child_context}` | +| Failed | `{:error, error_info}` | +| Cancelled | `{:error, error_info}` | +| Timeout | `{:ok, timeout_value}` | + +### How It Works + +1. Parent step calls `call_workflow(ChildModule, input)` +2. Child workflow execution is created with `parent_workflow_id` set +3. Parent suspends (like `wait_for_event`) +4. Child runs in the queue independently +5. When child completes/fails, parent is automatically notified +6. Parent resumes and `call_workflow` returns the result + +## `start_workflow/3` — Fire-and-Forget + +Start a child workflow without waiting. Parent continues immediately. + +```elixir +workflow "onboarding" do + step :send_emails, fn data -> + {:ok, welcome_id} = start_workflow(MyApp.EmailWorkflow, + %{"to" => data.email, "template" => "welcome"}, + ref: :welcome_email + ) + {:ok, assign(data, :welcome_workflow_id, welcome_id)} + end + + step :next_step, fn data -> + # Parent continues — doesn't wait for email to send + {:ok, data} + end +end +``` + +**Options:** + +| Option | Description | Default | +|--------|-------------|---------| +| `:ref` | Reference name for idempotency | Module name | +| `:queue` | Queue for child workflow | `"default"` | + +### Idempotency + +Both functions are idempotent on resume. If the parent workflow crashes and restarts: + +- `call_workflow` — Won't create a duplicate child. If the child already completed, returns the result immediately. +- `start_workflow` — Won't create a duplicate child. Returns the same `child_id`. + +The `:ref` option controls idempotency grouping. Use different refs to create multiple children of the same type: + +```elixir +step :send_multiple, fn data -> + {:ok, _} = start_workflow(MyApp.EmailWorkflow, + %{"template" => "welcome"}, ref: :welcome) + + {:ok, _} = start_workflow(MyApp.EmailWorkflow, + %{"template" => "getting_started"}, ref: :getting_started) + + {:ok, data} +end +``` + +## Cascade Cancellation + +Cancelling a parent automatically cancels all active children: + +```elixir +# This cancels the parent AND any pending/running/waiting children +Durable.cancel(parent_workflow_id, "User cancelled order") +``` + +Children that already completed are not affected. + +## Querying Children + +List child workflows for a parent: + +```elixir +# All children +children = Durable.list_children(parent_workflow_id) + +# Filter by status +running = Durable.list_children(parent_workflow_id, status: :running) +completed = Durable.list_children(parent_workflow_id, status: :completed) +``` + +## Examples + +### Order Pipeline + +A parent workflow that calls payment and notification children: + +```elixir +defmodule MyApp.PaymentWorkflow do + use Durable + use Durable.Context + + workflow "charge" do + step :process, fn _data -> + amount = input()["amount"] + put_context(:payment_id, "pay_#{:rand.uniform(10_000)}") + put_context(:charged, amount) + end + end +end + +defmodule MyApp.EmailWorkflow do + use Durable + use Durable.Context + + workflow "send_email" do + step :deliver, fn _data -> + to = input()["to"] + template = input()["template"] + Mailer.deliver(to, template) + put_context(:delivered, true) + end + end +end + +defmodule MyApp.OrderWorkflow do + use Durable + use Durable.Context + use Durable.Orchestration + use Durable.Helpers + + workflow "process_order" do + step :validate, fn _data -> + order = input() + put_context(:order_id, order["id"]) + put_context(:total, order["total"]) + put_context(:email, order["email"]) + end + + # Synchronous — wait for payment result + step :charge_payment, fn data -> + case call_workflow(MyApp.PaymentWorkflow, + %{"amount" => data.total}, timeout: hours(1)) do + {:ok, result} -> + {:ok, assign(data, :payment_id, result["payment_id"])} + {:error, reason} -> + {:error, "Payment failed: #{inspect(reason)}"} + end + end + + # Fire-and-forget — email sent independently + step :send_confirmation, fn data -> + {:ok, email_id} = start_workflow(MyApp.EmailWorkflow, + %{"to" => data.email, "template" => "order_confirmed"}, + ref: :confirmation_email + ) + {:ok, assign(data, :email_workflow_id, email_id)} + end + + step :complete, fn data -> + {:ok, assign(data, :status, "completed")} + end + end +end + +# Start the pipeline +{:ok, id} = Durable.start(MyApp.OrderWorkflow, %{ + "id" => "order_123", + "total" => 99.99, + "email" => "user@example.com" +}) +``` + +### Nested Workflows (A → B → C) + +Workflows can call children that call their own children: + +```elixir +defmodule MyApp.StepC do + use Durable + use Durable.Context + + workflow "step_c" do + step :work, fn _data -> + put_context(:c_result, "done") + end + end +end + +defmodule MyApp.StepB do + use Durable + use Durable.Context + use Durable.Orchestration + + workflow "step_b" do + step :call_c, fn data -> + case call_workflow(MyApp.StepC, %{}) do + {:ok, result} -> + {:ok, assign(data, :c_result, result["c_result"])} + {:error, reason} -> + {:error, reason} + end + end + end +end + +defmodule MyApp.StepA do + use Durable + use Durable.Context + use Durable.Orchestration + + workflow "step_a" do + step :call_b, fn data -> + case call_workflow(MyApp.StepB, %{}) do + {:ok, result} -> + {:ok, assign(data, :b_result, result)} + {:error, reason} -> + {:error, reason} + end + end + end +end +``` + +### Error Handling + +Handle child failures gracefully with branching: + +```elixir +workflow "resilient_order" do + step :try_payment, fn data -> + result = call_workflow(MyApp.PaymentWorkflow, + %{"amount" => data.total}, timeout: minutes(30)) + + case result do + {:ok, payment} -> + {:ok, data |> assign(:payment, payment) |> assign(:payment_status, :success)} + {:error, _reason} -> + {:ok, assign(data, :payment_status, :failed)} + end + end + + branch on: fn ctx -> ctx.payment_status end do + :success -> + step :fulfill, fn data -> + {:ok, assign(data, :fulfilled, true)} + end + + :failed -> + step :notify_failure, fn data -> + Mailer.send_payment_failure(data.email) + {:ok, assign(data, :fulfilled, false)} + end + end +end +``` + +## Limitations + +- `call_workflow` is not supported inside `parallel` blocks (use `start_workflow` instead) +- Child workflows run in the queue system — they're not executed inline by default +- The `:timeout` option requires the timeout checker to be running (same as `wait_for_event`) + +## Best Practices + +### Use Meaningful Refs + +```elixir +# Good — clear what each child does +start_workflow(MyApp.EmailWorkflow, input, ref: :welcome_email) +start_workflow(MyApp.EmailWorkflow, input, ref: :receipt_email) + +# Avoid — will collide if calling same module twice +start_workflow(MyApp.EmailWorkflow, input1) +start_workflow(MyApp.EmailWorkflow, input2) # Returns first child's ID! +``` + +### Handle Both Success and Failure + +```elixir +# Good — handles both cases +case call_workflow(MyApp.PaymentWorkflow, input) do + {:ok, result} -> handle_success(result) + {:error, reason} -> handle_failure(reason) +end + +# Risky — crashes on child failure +{:ok, result} = call_workflow(MyApp.PaymentWorkflow, input) +``` + +### Set Timeouts for call_workflow + +```elixir +# Good — won't wait forever +call_workflow(MyApp.SlowService, input, timeout: hours(2)) + +# Risky — waits indefinitely if child hangs +call_workflow(MyApp.SlowService, input) +```