Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/core/task/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,20 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
userMessageContent: (Anthropic.TextBlockParam | Anthropic.ImageBlockParam | Anthropic.ToolResultBlockParam)[] = []
userMessageContentReady = false

/**
* Flag indicating whether the assistant message for the current streaming session
* has been saved to API conversation history.
*
* This is critical for parallel tool calling: tools should NOT execute until
* the assistant message is saved. Otherwise, if a tool like `new_task` triggers
* `flushPendingToolResultsToHistory()`, the user message with tool_results would
* appear BEFORE the assistant message with tool_uses, causing API errors.
*
* Reset to `false` at the start of each API request.
* Set to `true` after the assistant message is saved in `recursivelyMakeClineRequests`.
*/
assistantMessageSavedToHistory = false

/**
* Push a tool_result block to userMessageContent, preventing duplicates.
* Duplicate tool_use_ids cause API errors.
Expand Down Expand Up @@ -1063,6 +1077,36 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
return
}

// CRITICAL: Wait for the assistant message to be saved to API history first.
// Without this, tool_result blocks would appear BEFORE tool_use blocks in the
// conversation history, causing API errors like:
// "unexpected `tool_use_id` found in `tool_result` blocks"
//
// This can happen when parallel tools are called (e.g., update_todo_list + new_task).
// Tools execute during streaming via presentAssistantMessage, BEFORE the assistant
// message is saved. When new_task triggers delegation, it calls this method to
// flush pending results - but the assistant message hasn't been saved yet.
//
// The assistantMessageSavedToHistory flag is:
// - Reset to false at the start of each API request
// - Set to true after the assistant message is saved in recursivelyMakeClineRequests
if (!this.assistantMessageSavedToHistory) {
await pWaitFor(() => this.assistantMessageSavedToHistory || this.abort, {
interval: 50,
timeout: 30_000, // 30 second timeout as safety net
}).catch(() => {
// If timeout or abort, log and proceed anyway to avoid hanging
console.warn(
`[Task#${this.taskId}] flushPendingToolResultsToHistory: timed out waiting for assistant message to be saved`,
)
})
}

// If task was aborted while waiting, don't flush
if (this.abort) {
return
}

// Save the user message with tool_result blocks
const userMessage: Anthropic.MessageParam = {
role: "user",
Expand Down Expand Up @@ -2707,6 +2751,7 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
this.userMessageContentReady = false
this.didRejectTool = false
this.didAlreadyUseTool = false
this.assistantMessageSavedToHistory = false
// Reset tool failure flag for each new assistant turn - this ensures that tool failures
// only prevent attempt_completion within the same assistant message, not across turns
// (e.g., if a tool fails, then user sends a message saying "just complete anyway")
Expand Down Expand Up @@ -3488,6 +3533,7 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
{ role: "assistant", content: assistantContent },
reasoningMessage || undefined,
)
this.assistantMessageSavedToHistory = true

TelemetryService.instance.captureConversationMessage(this.taskId, "assistant")
}
Expand Down
101 changes: 100 additions & 1 deletion src/core/task/__tests__/flushPendingToolResultsToHistory.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ vi.mock("fs/promises", async (importOriginal) => {
}
})

const { mockPWaitFor } = vi.hoisted(() => {
return { mockPWaitFor: vi.fn().mockImplementation(async () => Promise.resolve()) }
})

vi.mock("p-wait-for", () => ({
default: vi.fn().mockImplementation(async () => Promise.resolve()),
default: mockPWaitFor,
}))

vi.mock("vscode", () => {
Expand Down Expand Up @@ -344,4 +348,99 @@ describe("flushPendingToolResultsToHistory", () => {
expect((task.apiConversationHistory[0] as any).ts).toBeGreaterThanOrEqual(beforeTs)
expect((task.apiConversationHistory[0] as any).ts).toBeLessThanOrEqual(afterTs)
})

it("should skip waiting for assistantMessageSavedToHistory when flag is already true", async () => {
const task = new Task({
provider: mockProvider,
apiConfiguration: mockApiConfig,
task: "test task",
startTask: false,
})

// Set flag to true (assistant message already saved)
task.assistantMessageSavedToHistory = true

// Set up pending tool result
task.userMessageContent = [
{
type: "tool_result",
tool_use_id: "tool-skip-wait",
content: "Result when flag is true",
},
]

// Clear mock call history
mockPWaitFor.mockClear()

await task.flushPendingToolResultsToHistory()

// Should not have called pWaitFor since flag was already true
expect(mockPWaitFor).not.toHaveBeenCalled()

// Should still save the message
expect(task.apiConversationHistory.length).toBe(1)
expect((task.apiConversationHistory[0].content as any[])[0].tool_use_id).toBe("tool-skip-wait")
})

it("should wait for assistantMessageSavedToHistory when flag is false", async () => {
const task = new Task({
provider: mockProvider,
apiConfiguration: mockApiConfig,
task: "test task",
startTask: false,
})

// Flag is false by default - assistant message not yet saved
expect(task.assistantMessageSavedToHistory).toBe(false)

// Set up pending tool result
task.userMessageContent = [
{
type: "tool_result",
tool_use_id: "tool-wait",
content: "Result when flag is false",
},
]

// Clear mock call history
mockPWaitFor.mockClear()

await task.flushPendingToolResultsToHistory()

// Should have called pWaitFor since flag was false
expect(mockPWaitFor).toHaveBeenCalled()

// Should still save the message (mock resolves immediately)
expect(task.apiConversationHistory.length).toBe(1)
})

it("should not flush when task is aborted during wait", async () => {
const task = new Task({
provider: mockProvider,
apiConfiguration: mockApiConfig,
task: "test task",
startTask: false,
})

// Flag is false - will need to wait
task.assistantMessageSavedToHistory = false

// Set up pending tool result
task.userMessageContent = [
{
type: "tool_result",
tool_use_id: "tool-aborted",
content: "Should not be saved",
},
]

// Set abort flag - this will cause the condition in pWaitFor to return true
// AND will cause early return after the wait
task.abort = true

await task.flushPendingToolResultsToHistory()

// Should not have saved anything since task was aborted
expect(task.apiConversationHistory.length).toBe(0)
})
})
Loading