-
-
Notifications
You must be signed in to change notification settings - Fork 986
fix: batchTriggerAndWait with duplicate idempotencyKeys (#2965) #2977
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
626cd9c
b72d09c
c1c5102
cd36575
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| --- | ||
| "@trigger.dev/webapp": patch | ||
| --- | ||
|
|
||
| Fix batchTriggerAndWait running forever when duplicate idempotencyKey is provided in the same batch | ||
|
|
||
| When using batchTriggerAndWait with duplicate idempotencyKeys in the same batch, the batch would never complete because the completedCount and expectedCount would be mismatched. This fix ensures that cached runs (duplicate idempotencyKeys) are properly tracked in the batch, with their completedCount incremented immediately if the cached run is already in a final status. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -124,11 +124,11 @@ export class BatchTriggerV3Service extends BaseService { | |
|
|
||
| const existingBatch = options.idempotencyKey | ||
| ? await this._prisma.batchTaskRun.findFirst({ | ||
| where: { | ||
| runtimeEnvironmentId: environment.id, | ||
| idempotencyKey: options.idempotencyKey, | ||
| }, | ||
| }) | ||
| where: { | ||
| runtimeEnvironmentId: environment.id, | ||
| idempotencyKey: options.idempotencyKey, | ||
| }, | ||
| }) | ||
| : undefined; | ||
|
|
||
| if (existingBatch) { | ||
|
|
@@ -167,16 +167,16 @@ export class BatchTriggerV3Service extends BaseService { | |
|
|
||
| const dependentAttempt = body?.dependentAttempt | ||
| ? await this._prisma.taskRunAttempt.findFirst({ | ||
| where: { friendlyId: body.dependentAttempt }, | ||
| include: { | ||
| taskRun: { | ||
| select: { | ||
| id: true, | ||
| status: true, | ||
| }, | ||
| where: { friendlyId: body.dependentAttempt }, | ||
| include: { | ||
| taskRun: { | ||
| select: { | ||
| id: true, | ||
| status: true, | ||
| }, | ||
| }, | ||
| }) | ||
| }, | ||
| }) | ||
| : undefined; | ||
|
|
||
| if ( | ||
|
|
@@ -890,7 +890,72 @@ export class BatchTriggerV3Service extends BaseService { | |
| } | ||
| } | ||
|
|
||
| return false; | ||
| // FIX for Issue #2965: When a run is cached (duplicate idempotencyKey), | ||
| // we need to ALWAYS create a BatchTaskRunItem to properly track it. | ||
| // This handles cases where cached run may originate from another batch. | ||
| // Use unique constraint (batchTaskRunId, taskRunId) to prevent duplicates. | ||
| const isAlreadyComplete = isFinalRunStatus(result.run.status); | ||
|
|
||
| logger.debug( | ||
| "[BatchTriggerV2][processBatchTaskRunItem] Cached run detected, creating batch item", | ||
| { | ||
| batchId: batch.friendlyId, | ||
| runId: task.runId, | ||
| cachedRunId: result.run.id, | ||
| cachedRunStatus: result.run.status, | ||
| isAlreadyComplete, | ||
| currentIndex, | ||
| } | ||
| ); | ||
|
|
||
| // Always create BatchTaskRunItem for cached runs | ||
| // This ensures proper tracking even for cross-batch scenarios | ||
| try { | ||
| await this._prisma.batchTaskRunItem.create({ | ||
| data: { | ||
| batchTaskRunId: batch.id, | ||
| taskRunId: result.run.id, | ||
| // Use batchTaskRunItemStatusForRunStatus() for all cases | ||
| // This correctly maps both successful (COMPLETED) and failed (FAILED) statuses | ||
| status: batchTaskRunItemStatusForRunStatus(result.run.status), | ||
| }, | ||
| }); | ||
|
|
||
| // Only increment completedCount if the cached run is already finished | ||
| // For in-progress runs, completedCount will be incremented when the run completes | ||
| if (isAlreadyComplete) { | ||
| await this._prisma.batchTaskRun.update({ | ||
| where: { id: batch.id }, | ||
| data: { | ||
| completedCount: { | ||
| increment: 1, | ||
| }, | ||
| }, | ||
| }); | ||
| } | ||
|
|
||
| // Return true so expectedCount is incremented | ||
| return true; | ||
| } catch (error) { | ||
| if (isUniqueConstraintError(error, ["batchTaskRunId", "taskRunId"])) { | ||
| // BatchTaskRunItem already exists for this batch and cached run | ||
| // This can happen if the same idempotencyKey is used multiple times in the same batch | ||
| logger.debug( | ||
| "[BatchTriggerV2][processBatchTaskRunItem] BatchTaskRunItem already exists for cached run", | ||
| { | ||
| batchId: batch.friendlyId, | ||
| runId: task.runId, | ||
| cachedRunId: result.run.id, | ||
| currentIndex, | ||
| } | ||
| ); | ||
|
|
||
| // Don't increment expectedCount since this item is already tracked | ||
| return false; | ||
| } | ||
|
|
||
| throw error; | ||
| } | ||
|
Comment on lines
+893
to
+958
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Account for FAILED cached runs in batch completion. Consider counting 🔧 Suggested fix (count FAILED as completed)- const completedCount = await tx.batchTaskRunItem.count({
- where: { batchTaskRunId: batchId, status: "COMPLETED" },
- });
+ const completedCount = await tx.batchTaskRunItem.count({
+ where: {
+ batchTaskRunId: batchId,
+ status: { in: ["COMPLETED", "FAILED"] },
+ },
+ });🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| async #enqueueBatchTaskRun(options: BatchProcessingOptions) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 Cached runs with failed status are not counted for batch completion
When a cached run has a failed status (e.g.,
COMPLETED_WITH_ERRORS,CRASHED,SYSTEM_FAILURE), the batch will never complete.Click to expand
Root Cause
The fix creates a
BatchTaskRunItemwith status based onbatchTaskRunItemStatusForRunStatus(result.run.status)(line 920). For failed run statuses, this returnsFAILED(seetaskRun.server.ts:119-126):However,
tryCompleteBatchV3only counts items withstatus: "COMPLETED"(line 1034-1035):Actual vs Expected
Actual: For a cached run with
COMPLETED_WITH_ERRORSstatus:BatchTaskRunItemis created with statusFAILEDisAlreadyCompleteistrue(line 897) since it's a final statustryCompleteBatchV3only countsCOMPLETEDitems, missing this itemexpectedCountExpected: The batch should complete when all items have finished, regardless of whether the cached runs succeeded or failed.
Impact
This partially defeats the purpose of the fix -
batchTriggerAndWaitwill still run forever if duplicate idempotency keys reference runs that have already failed.Recommendation: For cached runs that are already complete (regardless of success/failure), create the
BatchTaskRunItemwith statusCOMPLETEDinstead of usingbatchTaskRunItemStatusForRunStatus(). This aligns with howcompleteBatchTaskRunItemV3works - it always sets status toCOMPLETEDwhen a run finishes (line 1088).Was this helpful? React with 👍 or 👎 to provide feedback.