Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 92 additions & 5 deletions packages/opencode/src/tasks/pulse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { SessionStatus } from "../session/status"
import type { Task, Job, AdversarialVerdict } from "./types"
import { Instance, context as instanceContext } from "../project/instance"
import { GlobalBus } from "../bus/global"
import { Identifier } from "../id/id"

const log = Log.create({ service: "taskctl.pulse" })
const activeTicks = new Map<string, Set<string>>()
Expand Down Expand Up @@ -235,7 +236,76 @@ function isPidAlive(pid: number): boolean {
}
}

export { isPidAlive, writeLockFile, removeLockFile, readLockPid, checkTimeouts, processAdversarialVerdicts, spawnAdversarial, scheduleReadyTasks, heartbeatActiveAgents }
async function notifyPM(pmSessionId: string, text: string): Promise<{ ok: true } | { ok: false; error: string }> {
try {
// Validate text input
if (!text || text.length === 0) {
return { ok: false, error: "Notification text cannot be empty" }
}
if (text.length > 10000) {
return { ok: false, error: "Notification text exceeds maximum length (10000 chars)" }
}

const messageId = Identifier.ascending("message")
const partId = Identifier.ascending("part")
const now = Date.now()

// Validate session ID format before attempting lookup
if (!pmSessionId || typeof pmSessionId !== "string" || pmSessionId.length < 5) {
return { ok: false, error: "Invalid PM session ID format" }
}

const pmSession = await Session.get(pmSessionId).catch(() => null)
if (!pmSession) {
log.warn("PM session not found for notification", { pmSessionId })
return { ok: false, error: "PM session not found" }
}

// Use for await to avoid loading all messages into memory
let lastMsg: MessageV2.WithParts | null = null
for await (const msg of MessageV2.stream(pmSessionId)) {
lastMsg = msg
break // Only need the first (most recent) message
}

// Determine parent message ID with safety checks
const parentID = lastMsg?.info.id ?? pmSessionId

await Session.updateMessage({
id: messageId,
sessionID: pmSessionId,
role: "assistant",
time: { created: now, completed: now },
error: undefined,
parentID,
modelID: "notification",
providerID: "system",
mode: "notification",
agent: "system",
path: { cwd: pmSession.directory, root: pmSession.directory },
cost: 0,
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
})

await Session.updatePart({
id: partId,
messageID: messageId,
sessionID: pmSessionId,
type: "text",
text,
synthetic: true,
})

log.info("PM notification sent", { pmSessionId })
return { ok: true }
} catch (e) {
const errorMsg = String(e)
log.error("failed to notify PM", { pmSessionId, error: errorMsg })
return { ok: false, error: errorMsg }
}
}

export { isPidAlive, writeLockFile, removeLockFile, readLockPid, checkTimeouts, processAdversarialVerdicts, spawnAdversarial, scheduleReadyTasks, heartbeatActiveAgents, commitTask, escalateToPM, escalateCommitFailure, notifyPM }

async function scheduleReadyTasks(jobId: string, projectId: string, pmSessionId: string): Promise<void> {
const job = await Store.getJob(projectId, jobId)
Expand Down Expand Up @@ -692,7 +762,12 @@ If there is an error, report the full error output.`
parentSessionID: undefined,
})

log.info("task committed and closed", { taskId: task.id })
const notifyResult = await notifyPM(pmSessionId, `✅ Task complete: ${task.title} (${task.id})`)
if (!notifyResult.ok) {
log.warn("failed to notify PM of task completion", { taskId: task.id, error: notifyResult.error })
}

log.info("task committed and closed", { taskId: task.id })

// Immediately reschedule after commit — don't wait for next Pulse tick
await scheduleReadyTasks(jobId, projectId, pmSessionId).catch((e) =>
Expand Down Expand Up @@ -826,7 +901,15 @@ async function escalateToPM(task: Task, jobId: string, projectId: string, pmSess
parentSessionID: undefined,
})

log.error("task escalated to PM after 3 failures", { taskId: task.id, jobId })
const notifyResult = await notifyPM(
pmSessionId,
`❌ Task failed: ${task.title} (${task.id})\nUse: taskctl retry ${task.id}`
)
if (!notifyResult.ok) {
log.warn("failed to notify PM of task escalation", { taskId: task.id, error: notifyResult.error })
}

log.error("task escalated to PM after 3 failures", { taskId: task.id, jobId })
}

async function escalateCommitFailure(
Expand Down Expand Up @@ -1180,8 +1263,12 @@ export async function checkCompletion(
clearIntervalSafe(interval)
activeTicks.get(projectId)?.delete(jobId)
await removeLockFile(jobId, projectId)
await Store.updateJob(projectId, jobId, { status: "complete" })
Bus.publish(BackgroundTaskEvent.Completed, { taskID: jobId, sessionID: pmSessionId, parentSessionID: undefined })
await Store.updateJob(projectId, jobId, { status: "complete" })
Bus.publish(BackgroundTaskEvent.Completed, { taskID: jobId, sessionID: pmSessionId, parentSessionID: undefined })
const notifyResult = await notifyPM(pmSessionId, `🎉 Job complete: all tasks done for issue #${jobTasks[0]?.parent_issue ?? "unknown"}`)
if (!notifyResult.ok) {
log.warn("failed to notify PM of job completion", { jobId, error: notifyResult.error })
}
} catch (e) {
activeTicks.get(projectId)?.delete(jobId)
await removeLockFile(jobId, projectId).catch(() => {})
Expand Down
Loading