diff --git a/packages/opencode/src/tasks/pulse.ts b/packages/opencode/src/tasks/pulse.ts index 2a7f969f004d..0f59539886cc 100644 --- a/packages/opencode/src/tasks/pulse.ts +++ b/packages/opencode/src/tasks/pulse.ts @@ -15,6 +15,7 @@ import { SessionStatus } from "../session/status" import type { Task, Job, AdversarialVerdict } from "./types" import { Instance, context as instanceContext } from "../project/instance" import { GlobalBus } from "../bus/global" +import { Identifier } from "../id/id" const log = Log.create({ service: "taskctl.pulse" }) const activeTicks = new Map>() @@ -235,7 +236,76 @@ function isPidAlive(pid: number): boolean { } } -export { isPidAlive, writeLockFile, removeLockFile, readLockPid, checkTimeouts, processAdversarialVerdicts, spawnAdversarial, scheduleReadyTasks, heartbeatActiveAgents } +async function notifyPM(pmSessionId: string, text: string): Promise<{ ok: true } | { ok: false; error: string }> { + try { + // Validate text input + if (!text || text.length === 0) { + return { ok: false, error: "Notification text cannot be empty" } + } + if (text.length > 10000) { + return { ok: false, error: "Notification text exceeds maximum length (10000 chars)" } + } + + const messageId = Identifier.ascending("message") + const partId = Identifier.ascending("part") + const now = Date.now() + + // Validate session ID format before attempting lookup + if (!pmSessionId || typeof pmSessionId !== "string" || pmSessionId.length < 5) { + return { ok: false, error: "Invalid PM session ID format" } + } + + const pmSession = await Session.get(pmSessionId).catch(() => null) + if (!pmSession) { + log.warn("PM session not found for notification", { pmSessionId }) + return { ok: false, error: "PM session not found" } + } + + // Use for await to avoid loading all messages into memory + let lastMsg: MessageV2.WithParts | null = null + for await (const msg of MessageV2.stream(pmSessionId)) { + lastMsg = msg + break // Only need the first (most recent) message + } + + // Determine parent message ID with safety checks + const parentID = lastMsg?.info.id ?? pmSessionId + + await Session.updateMessage({ + id: messageId, + sessionID: pmSessionId, + role: "assistant", + time: { created: now, completed: now }, + error: undefined, + parentID, + modelID: "notification", + providerID: "system", + mode: "notification", + agent: "system", + path: { cwd: pmSession.directory, root: pmSession.directory }, + cost: 0, + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + }) + + await Session.updatePart({ + id: partId, + messageID: messageId, + sessionID: pmSessionId, + type: "text", + text, + synthetic: true, + }) + + log.info("PM notification sent", { pmSessionId }) + return { ok: true } + } catch (e) { + const errorMsg = String(e) + log.error("failed to notify PM", { pmSessionId, error: errorMsg }) + return { ok: false, error: errorMsg } + } +} + +export { isPidAlive, writeLockFile, removeLockFile, readLockPid, checkTimeouts, processAdversarialVerdicts, spawnAdversarial, scheduleReadyTasks, heartbeatActiveAgents, commitTask, escalateToPM, escalateCommitFailure, notifyPM } async function scheduleReadyTasks(jobId: string, projectId: string, pmSessionId: string): Promise { const job = await Store.getJob(projectId, jobId) @@ -692,7 +762,12 @@ If there is an error, report the full error output.` parentSessionID: undefined, }) - log.info("task committed and closed", { taskId: task.id }) + const notifyResult = await notifyPM(pmSessionId, `✅ Task complete: ${task.title} (${task.id})`) + if (!notifyResult.ok) { + log.warn("failed to notify PM of task completion", { taskId: task.id, error: notifyResult.error }) + } + + log.info("task committed and closed", { taskId: task.id }) // Immediately reschedule after commit — don't wait for next Pulse tick await scheduleReadyTasks(jobId, projectId, pmSessionId).catch((e) => @@ -826,7 +901,15 @@ async function escalateToPM(task: Task, jobId: string, projectId: string, pmSess parentSessionID: undefined, }) - log.error("task escalated to PM after 3 failures", { taskId: task.id, jobId }) + const notifyResult = await notifyPM( + pmSessionId, + `❌ Task failed: ${task.title} (${task.id})\nUse: taskctl retry ${task.id}` + ) + if (!notifyResult.ok) { + log.warn("failed to notify PM of task escalation", { taskId: task.id, error: notifyResult.error }) + } + + log.error("task escalated to PM after 3 failures", { taskId: task.id, jobId }) } async function escalateCommitFailure( @@ -1180,8 +1263,12 @@ export async function checkCompletion( clearIntervalSafe(interval) activeTicks.get(projectId)?.delete(jobId) await removeLockFile(jobId, projectId) - await Store.updateJob(projectId, jobId, { status: "complete" }) - Bus.publish(BackgroundTaskEvent.Completed, { taskID: jobId, sessionID: pmSessionId, parentSessionID: undefined }) + await Store.updateJob(projectId, jobId, { status: "complete" }) + Bus.publish(BackgroundTaskEvent.Completed, { taskID: jobId, sessionID: pmSessionId, parentSessionID: undefined }) + const notifyResult = await notifyPM(pmSessionId, `🎉 Job complete: all tasks done for issue #${jobTasks[0]?.parent_issue ?? "unknown"}`) + if (!notifyResult.ok) { + log.warn("failed to notify PM of job completion", { jobId, error: notifyResult.error }) + } } catch (e) { activeTicks.get(projectId)?.delete(jobId) await removeLockFile(jobId, projectId).catch(() => {}) diff --git a/packages/opencode/test/tasks/notifications.test.ts b/packages/opencode/test/tasks/notifications.test.ts new file mode 100644 index 000000000000..02d3d6f509ce --- /dev/null +++ b/packages/opencode/test/tasks/notifications.test.ts @@ -0,0 +1,291 @@ +import { describe, test, expect, beforeEach, afterEach } from "bun:test" +import { Store } from "../../src/tasks/store" +import { Global } from "../../src/global" +import { BackgroundTaskEvent } from "../../src/session/async-tasks" +import { Bus } from "../../src/bus" +import path from "path" +import fs from "fs/promises" +import { Instance } from "../../src/project/instance" +import { Session } from "../../src/session" +import { MessageV2 } from "../../src/session/message-v2" +import { Identifier } from "../../src/id/id" + +const TEST_PROJECT_ID = "test-notifications-project" +const TEST_JOB_ID = "job-notifications-test" + +describe("PM notifications", () => { + let originalDataPath: string + let testDataDir: string + let pmSessionId: string + + beforeEach(async () => { + originalDataPath = Global.Path.data + testDataDir = path.join("/tmp", "opencode-notifications-test-" + Math.random().toString(36).slice(2)) + await fs.mkdir(testDataDir, { recursive: true }) + + process.env.OPENCODE_TEST_HOME = testDataDir + await Global.init() + }) + + afterEach(async () => { + const tasksDir = path.join(Global.Path.data, "tasks", TEST_PROJECT_ID) + const lockPath = path.join(tasksDir, `job-${TEST_JOB_ID}.lock`) + await fs.unlink(lockPath).catch(() => {}) + + await fs.rm(testDataDir, { recursive: true, force: true }).catch(() => {}) + if (originalDataPath) { + delete process.env.OPENCODE_TEST_HOME + } + }) + + test("BackgroundTaskEvent.Completed fired when task escalated (after 3 failures)", async () => { + const { escalateToPM } = await import("../../src/tasks/pulse") + + await Instance.provide({ + directory: testDataDir, + fn: async () => { + const pmSession = await Session.createNext({ + directory: testDataDir, + }) + pmSessionId = pmSession.id + + const mockTask: any = { + id: "task-escalated-test", + job_id: TEST_JOB_ID, + status: "review", + priority: 2, + task_type: "implementation", + parent_issue: 273, + labels: [], + depends_on: [], + assignee: null, + assignee_pid: null, + worktree: testDataDir, + branch: "feature/test", + title: "Task failed adversarial", + description: "Test description", + acceptance_criteria: "Test criteria", + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + close_reason: null, + comments: [], + pipeline: { + stage: "adversarial-running", + attempt: 3, + last_activity: new Date().toISOString(), + last_steering: null, + history: [], + adversarial_verdict: { + verdict: "CRITICAL_ISSUES_FOUND", + issues: [ + { + location: "src/index.ts:42", + severity: "CRITICAL", + fix: "Fix security vulnerability", + }, + ], + summary: "Critical security issue found", + created_at: new Date().toISOString(), + }, + }, + } + + await Store.createJob(TEST_PROJECT_ID, { + id: TEST_JOB_ID, + parent_issue: 273, + status: "running", + created_at: new Date().toISOString(), + stopping: false, + pulse_pid: null, + max_workers: 1, + pm_session_id: pmSessionId, + }) + + await Store.createTask(TEST_PROJECT_ID, mockTask) + + let eventFired = false + const unsub = Bus.subscribe(BackgroundTaskEvent.Completed, (evt) => { + if (evt.properties.taskID.startsWith("escalation-") && evt.properties.sessionID === pmSessionId) { + eventFired = true + unsub() + } + }) + + await escalateToPM(mockTask, TEST_JOB_ID, TEST_PROJECT_ID, pmSessionId) + + await new Promise((r) => setTimeout(r, 200)) + expect(eventFired).toBe(true) + + const updated = await Store.getTask(TEST_PROJECT_ID, "task-escalated-test") + expect(updated?.status).toBe("failed") + expect(updated?.pipeline.stage).toBe("failed") + }, + }) + }) + + test("BackgroundTaskEvent.Completed fired when job completes (all tasks done)", async () => { + const { checkCompletion } = await import("../../src/tasks/pulse") + + await Instance.provide({ + directory: testDataDir, + fn: async () => { + const pmSession = await Session.createNext({ + directory: testDataDir, + }) + pmSessionId = pmSession.id + + const jobId = "job-completion-test-" + Date.now() + + await Store.createJob(TEST_PROJECT_ID, { + id: jobId, + parent_issue: 273, + status: "running", + created_at: new Date().toISOString(), + stopping: false, + pulse_pid: null, + max_workers: 1, + pm_session_id: pmSessionId, + }) + + // Create two completed tasks + for (let i = 1; i <= 2; i++) { + await Store.createTask(TEST_PROJECT_ID, { + id: `task-completion-${jobId}-${i}`, + job_id: jobId, + status: "closed", + priority: 2, + task_type: "implementation", + parent_issue: 273, + labels: [], + depends_on: [], + assignee: null, + assignee_pid: null, + worktree: null, + branch: null, + title: `Completed task ${i}`, + description: "Test description", + acceptance_criteria: "Test criteria", + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + close_reason: "approved and committed", + comments: [], + pipeline: { + stage: "done", + attempt: 0, + last_activity: null, + last_steering: null, + history: [], + adversarial_verdict: null, + }, + }) + } + + let jobCompletionEventFired = false + const unsub = Bus.subscribe(BackgroundTaskEvent.Completed, (evt) => { + if (evt.properties.taskID === jobId && evt.properties.sessionID === pmSessionId) { + jobCompletionEventFired = true + unsub() + } + }) + + const testInterval = setInterval(() => {}, 5000) + try { + await checkCompletion(jobId, TEST_PROJECT_ID, pmSessionId, testInterval) + } finally { + clearInterval(testInterval) + } + + await new Promise((r) => setTimeout(r, 200)) + expect(jobCompletionEventFired).toBe(true) + + const job = await Store.getJob(TEST_PROJECT_ID, jobId) + expect(job?.status).toBe("complete") + }, + }) + }) + + test("System notification message added to PM session when task done", async () => { + const { notifyPM } = await import("../../src/tasks/pulse") + + await Instance.provide({ + directory: testDataDir, + fn: async () => { + const pmSession = await Session.createNext({ + directory: testDataDir, + }) + pmSessionId = pmSession.id + + const message = "✅ Task complete: Test Task (task-123)" + + const result = await notifyPM(pmSessionId, message) + expect(result.ok).toBe(true) + + await new Promise((r) => setTimeout(r, 100)) + + // Verify message was added to session + const msgs: MessageV2.WithParts[] = [] + for await (const msg of MessageV2.stream(pmSessionId)) { + msgs.push(msg) + } + + const notificationMsg = msgs.find((m) => m.info.role === "assistant" && m.info.agent === "system") + expect(notificationMsg).toBeDefined() + + const textPart = notificationMsg?.parts.find((p): p is MessageV2.TextPart => p.type === "text" && (p.synthetic ?? false)) + expect(textPart?.text).toContain("Task complete") + }, + }) + }) + + test("notifyPM returns error for invalid session ID", async () => { + const { notifyPM } = await import("../../src/tasks/pulse") + + const result = await notifyPM("invalid-session", "test message") + expect(result.ok).toBe(false) + if (!result.ok) { + expect(result.error).toBeTruthy() + expect(typeof result.error).toBe("string") + } + }) + + test("notifyPM returns error for empty text", async () => { + const { notifyPM } = await import("../../src/tasks/pulse") + + await Instance.provide({ + directory: testDataDir, + fn: async () => { + const pmSession = await Session.createNext({ + directory: testDataDir, + }) + pmSessionId = pmSession.id + + const result = await notifyPM(pmSessionId, "") + expect(result.ok).toBe(false) + if (!result.ok) { + expect(result.error).toContain("empty") + } + }, + }) + }) + + test("notifyPM returns error for text exceeding length limit", async () => { + const { notifyPM } = await import("../../src/tasks/pulse") + + await Instance.provide({ + directory: testDataDir, + fn: async () => { + const pmSession = await Session.createNext({ + directory: testDataDir, + }) + pmSessionId = pmSession.id + + const longText = "x".repeat(10001) + const result = await notifyPM(pmSessionId, longText) + expect(result.ok).toBe(false) + if (!result.ok) { + expect(result.error).toContain("exceeds") + } + }, + }) + }) +})