Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions packages/opencode/src/tasks/pulse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,13 @@ export function startPulse(jobId: string, projectId: string, pmSessionId: string
}

const startJob = async (): Promise<void> => {
const existingPid = await readLockPid(jobId, projectId).catch(() => null)
if (existingPid && isPidAlive(existingPid)) {
log.error("job already running", { jobId, existingPid })
await writeLockFile(jobId, projectId, process.pid)
const lockPid = await readLockPid(jobId, projectId).catch(() => null)
if (lockPid !== process.pid) {
log.error("lost lock file race, aborting start", { jobId, lockPid, myPid: process.pid })
return
}
if (existingPid && !isPidAlive(existingPid)) {
log.warn("overwriting stale lock file", { jobId, oldPid: existingPid })
}
writeLockFile(jobId, projectId, process.pid).catch((e) =>
log.error("failed to write lock file", { jobId, error: String(e) }),
)
await Store.updateJob(projectId, jobId, { pulse_pid: process.pid })
}

startJob()
Expand Down Expand Up @@ -119,19 +115,21 @@ export async function resurrectionScan(jobId: string, projectId: string): Promis
const alive = pidAlive || sessionAlive

if (!alive) {
if (task.pipeline.stage === "developing") {
// Developer finished before restart — advance to reviewing, preserve worktree/branch
if (task.pipeline.stage === "developing" || task.pipeline.stage === "adversarial-running") {
// Developer or adversarial finished before restart — advance to reviewing, preserve worktree/branch
await Store.updateTask(projectId, task.id, {
assignee: null,
assignee_pid: null,
pipeline: { ...task.pipeline, stage: "reviewing", last_activity: new Date().toISOString() },
}, true)
await Store.addComment(projectId, task.id, {
author: "system",
message: "Resurrected: developer session ended before restart. Advanced to reviewing.",
message: task.pipeline.stage === "developing"
? "Resurrected: developer session ended before restart. Advanced to reviewing."
: "Resurrected: adversarial session ended before restart. Returned to reviewing.",
created_at: new Date().toISOString(),
})
log.info("resurrected developing task to reviewing", { taskId: task.id, jobId })
log.info("resurrected task to reviewing", { taskId: task.id, jobId, fromStage: task.pipeline.stage })
} else {
// Other stages — reset to idle (existing behavior)
let worktreeRemoved = false
Expand Down Expand Up @@ -237,7 +235,7 @@ function isPidAlive(pid: number): boolean {
}
}

export { isPidAlive, writeLockFile, removeLockFile, readLockPid, processAdversarialVerdicts, spawnAdversarial, scheduleReadyTasks, heartbeatActiveAgents }
export { isPidAlive, writeLockFile, removeLockFile, readLockPid, checkTimeouts, processAdversarialVerdicts, spawnAdversarial, scheduleReadyTasks, heartbeatActiveAgents }

async function scheduleReadyTasks(jobId: string, projectId: string, pmSessionId: string): Promise<void> {
const job = await Store.getJob(projectId, jobId)
Expand Down Expand Up @@ -416,7 +414,7 @@ async function checkTimeouts(jobId: string, projectId: string): Promise<void> {
const allTasks = await Store.listTasks(projectId)
const jobTasks = allTasks.filter((t) => t.job_id === jobId)
const now = Date.now()
const ADVERSARIAL_TIMEOUT_MS = 60 * 60 * 1000
const ADVERSARIAL_TIMEOUT_MS = 30 * 60 * 1000
const SESSION_MESSAGE_TIMEOUT_MS = 30 * 60 * 1000

for (const task of jobTasks) {
Expand Down Expand Up @@ -508,7 +506,7 @@ async function checkTimeouts(jobId: string, projectId: string): Promise<void> {

await Store.addComment(projectId, task.id, {
author: "system",
message: "Adversarial agent timed out after 60 minutes. Will retry on next Pulse tick.",
message: "Adversarial agent timed out after 30 minutes. Will retry on next Pulse tick.",
created_at: new Date().toISOString(),
})
}
Expand Down
4 changes: 2 additions & 2 deletions packages/opencode/src/tasks/tool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ if (params.command === "start") {
assignee: null,
assignee_pid: null,
pipeline: { ...task.pipeline, stage: "done" },
})
}, true)

await Store.addComment(projectId, params.taskId, {
author: "system",
Expand Down Expand Up @@ -935,7 +935,7 @@ if (params.command === "start") {
adversarial_verdict: null,
last_activity: null,
},
})
}, true)

await Store.addComment(projectId, params.taskId, {
author: "system",
Expand Down
182 changes: 182 additions & 0 deletions packages/opencode/test/tasks/tool.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import { describe, test, expect, beforeEach, afterEach } from "bun:test"
import { Store } from "../../src/tasks/store"
import { Global } from "../../src/global"
import path from "path"
import fs from "fs/promises"

const TEST_PROJECT_ID = "test-tool-project"
const TEST_JOB_ID = "job-test-tool-123"
const TEST_TASK_ID = "task-tool-1"

describe("tool.ts - taskctl commands", () => {
let originalDataPath: string
let testDataDir: string

beforeEach(async () => {
originalDataPath = Global.Path.data
testDataDir = path.join("/tmp", "opencode-tool-test-" + Math.random().toString(36).slice(2))
await fs.mkdir(testDataDir, { recursive: true })

process.env.OPENCODE_TEST_HOME = testDataDir
await Global.init()

await Store.createJob(TEST_PROJECT_ID, {
id: TEST_JOB_ID,
parent_issue: 123,
status: "running",
created_at: new Date().toISOString(),
stopping: false,
pulse_pid: null,
max_workers: 1,
pm_session_id: "pm-session",
})
})

afterEach(async () => {
await fs.rm(testDataDir, { recursive: true, force: true }).catch(() => {})
if (originalDataPath) {
delete process.env.OPENCODE_TEST_HOME
}
})

describe("retry command", () => {
test("retry resets adversarial-running task without throwing", async () => {
const worktreePath = path.join(testDataDir, "worktree-retry")
await fs.mkdir(worktreePath, { recursive: true })

const task: any = {
id: TEST_TASK_ID,
job_id: TEST_JOB_ID,
status: "failed",
priority: 1,
task_type: "implementation",
parent_issue: 123,
labels: [],
depends_on: [],
assignee: "ses-session123",
assignee_pid: 12345,
worktree: worktreePath,
branch: "feature/test",
title: "Test task",
description: "Test",
acceptance_criteria: "Test",
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
close_reason: null,
comments: [],
pipeline: {
stage: "adversarial-running",
attempt: 1,
last_activity: new Date().toISOString(),
last_steering: null,
history: [],
adversarial_verdict: {
verdict: "ISSUES_FOUND",
summary: "Test issues",
issues: [{ location: "file.ts", severity: "HIGH", fix: "Fix this" }],
created_at: new Date().toISOString(),
},
},
}

await Store.createTask(TEST_PROJECT_ID, task)

await expect(async () => {
const result = await Store.updateTask(TEST_PROJECT_ID, TEST_TASK_ID, {
status: "open",
assignee: null,
assignee_pid: null,
worktree: null,
branch: null,
pipeline: {
...task.pipeline,
stage: "idle",
attempt: 1,
adversarial_verdict: null,
last_activity: null,
},
}, true)

await Store.addComment(TEST_PROJECT_ID, TEST_TASK_ID, {
author: "system",
message: `Retried by PM. Task reset to open. Pulse will reschedule on next tick.`,
created_at: new Date().toISOString(),
})
}).not.toThrow()

const updated = await Store.getTask(TEST_PROJECT_ID, TEST_TASK_ID)
expect(updated?.status).toBe("open")
expect(updated?.assignee).toBeNull()
expect(updated?.worktree).toBeNull()
expect(updated?.branch).toBeNull()
expect(updated?.pipeline.stage).toBe("idle")
expect(updated?.pipeline.attempt).toBe(1)
expect(updated?.pipeline.adversarial_verdict).toBeNull()
})
})

describe("override command", () => {
test("override --skip on adversarial-running task succeeds without throwing", async () => {
const worktreePath = path.join(testDataDir, "worktree-override")
await fs.mkdir(worktreePath, { recursive: true })

const task: any = {
id: TEST_TASK_ID,
job_id: TEST_JOB_ID,
status: "failed",
priority: 1,
task_type: "implementation",
parent_issue: 123,
labels: [],
depends_on: [],
assignee: "ses-session456",
assignee_pid: 67890,
worktree: worktreePath,
branch: "feature/override-test",
title: "Test task",
description: "Test",
acceptance_criteria: "Test",
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
close_reason: null,
comments: [],
pipeline: {
stage: "adversarial-running",
attempt: 1,
last_activity: new Date().toISOString(),
last_steering: null,
history: [],
adversarial_verdict: null,
},
}

await Store.createTask(TEST_PROJECT_ID, task)

await expect(async () => {
await Store.updateTask(TEST_PROJECT_ID, TEST_TASK_ID, {
status: "closed",
close_reason: "skipped by PM",
worktree: null,
branch: null,
assignee: null,
assignee_pid: null,
pipeline: { ...task.pipeline, stage: "done" },
}, true)

await Store.addComment(TEST_PROJECT_ID, TEST_TASK_ID, {
author: "system",
message: "Skipped by PM override. Dependent tasks are now unblocked.",
created_at: new Date().toISOString(),
})
}).not.toThrow()

const updated = await Store.getTask(TEST_PROJECT_ID, TEST_TASK_ID)
expect(updated?.status).toBe("closed")
expect(updated?.close_reason).toBe("skipped by PM")
expect(updated?.worktree).toBeNull()
expect(updated?.branch).toBeNull()
expect(updated?.assignee).toBeNull()
expect(updated?.pipeline.stage).toBe("done")
})
})
})