From 3eef58b56e507d30312d63a38cc8f499ff44206f Mon Sep 17 00:00:00 2001 From: biubiupiu <1998bygone@gmail.com> Date: Sun, 14 Sep 2025 01:00:18 +0800 Subject: [PATCH] feat(parser): add tool-input-start/end events to morphXmlProtocol MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support for tool-input-start and tool-input-end events in morphXmlProtocol's createStreamParser to track tool call lifecycle. Events are emitted with matching IDs for proper correlation between start, end, and tool-call events. - Emit tool-input-start when tool opening tag is detected - Emit tool-input-end when tool parsing completes (success or error) - Handle incomplete tool calls at stream end - Add comprehensive test coverage for all scenarios 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../src/protocols/morph-xml-protocol.ts | 40 ++- .../xml-protocol.stream.input-events.test.ts | 230 ++++++++++++++++++ 2 files changed, 267 insertions(+), 3 deletions(-) create mode 100644 packages/parser/tests/protocols/xml-protocol.stream.input-events.test.ts diff --git a/packages/parser/src/protocols/morph-xml-protocol.ts b/packages/parser/src/protocols/morph-xml-protocol.ts index a1977e97..9b3ed53b 100644 --- a/packages/parser/src/protocols/morph-xml-protocol.ts +++ b/packages/parser/src/protocols/morph-xml-protocol.ts @@ -119,7 +119,8 @@ export const morphXmlProtocol = (): ToolCallProtocol => ({ ? Math.max(...toolNames.map(n => `<${n}>`.length)) : 0; let buffer = ""; - let currentToolCall: { name: string; content: string } | null = null; + let currentToolCall: { name: string; content: string; id: string } | null = + null; let currentTextId: string | null = null; const flushText = ( @@ -178,9 +179,16 @@ export const morphXmlProtocol = (): ToolCallProtocol => ({ // No additional fallback: RXML handles raw content for string fields flushText(controller); + + // Emit tool-input-end event + controller.enqueue({ + type: "tool-input-end", + id: currentToolCall.id, + }); + controller.enqueue({ type: "tool-call", - toolCallId: generateId(), + toolCallId: currentToolCall.id, toolName: currentToolCall.name, input: JSON.stringify(parsed), }); @@ -200,6 +208,13 @@ export const morphXmlProtocol = (): ToolCallProtocol => ({ toolName: currentToolCall.name, error, }); + + // Emit tool-input-end event even on error + controller.enqueue({ + type: "tool-input-end", + id: currentToolCall.id, + }); + flushText(controller, originalCallText); } currentToolCall = null; @@ -233,7 +248,20 @@ export const morphXmlProtocol = (): ToolCallProtocol => ({ buffer = buffer.substring( earliestStartTagIndex + startTag.length ); - currentToolCall = { name: earliestToolName, content: "" }; + + const toolCallId = generateId(); + currentToolCall = { + name: earliestToolName, + content: "", + id: toolCallId, + }; + + // Emit tool-input-start event + controller.enqueue({ + type: "tool-input-start", + id: toolCallId, + toolName: earliestToolName, + }); } else { // No start tag currently in buffer. Stream out as much as possible // while keeping a small tail to catch a tag split across chunks. @@ -253,6 +281,12 @@ export const morphXmlProtocol = (): ToolCallProtocol => ({ }, flush(controller) { if (currentToolCall) { + // Emit tool-input-end for incomplete tool call + controller.enqueue({ + type: "tool-input-end", + id: currentToolCall.id, + }); + const unfinishedCall = `<${currentToolCall.name}>${buffer}`; flushText(controller, unfinishedCall); } else if (buffer) { diff --git a/packages/parser/tests/protocols/xml-protocol.stream.input-events.test.ts b/packages/parser/tests/protocols/xml-protocol.stream.input-events.test.ts new file mode 100644 index 00000000..9f89cfbf --- /dev/null +++ b/packages/parser/tests/protocols/xml-protocol.stream.input-events.test.ts @@ -0,0 +1,230 @@ +import type { LanguageModelV2StreamPart } from "@ai-sdk/provider"; +import { describe, expect, it, vi } from "vitest"; + +import { morphXmlProtocol } from "@/protocols/morph-xml-protocol"; + +vi.mock("@ai-sdk/provider-utils", () => ({ + generateId: vi.fn(() => "mock-id"), +})); + +function collect(stream: ReadableStream) { + const out: LanguageModelV2StreamPart[] = []; + return (async () => { + for await (const c of stream) out.push(c); + return out; + })(); +} + +const tools = [ + { + type: "function", + name: "get_weather", + description: "", + inputSchema: { type: "object" }, + }, +] as any; + +describe("morphXmlProtocol tool-input events", () => { + it("emits tool-input-start and tool-input-end events for successful tool call", async () => { + const protocol = morphXmlProtocol(); + const transformer = protocol.createStreamParser({ tools }); + const rs = new ReadableStream({ + start(ctrl) { + ctrl.enqueue({ type: "text-delta", id: "1", delta: "prefix " }); + ctrl.enqueue({ type: "text-delta", id: "1", delta: "" }); + ctrl.enqueue({ + type: "text-delta", + id: "1", + delta: "NY", + }); + ctrl.enqueue({ type: "text-delta", id: "1", delta: "" }); + ctrl.enqueue({ type: "text-delta", id: "1", delta: " suffix" }); + ctrl.enqueue({ + type: "finish", + finishReason: "stop", + usage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, + }); + ctrl.close(); + }, + }); + + const out = await collect(rs.pipeThrough(transformer)); + + // Find tool-input events + const toolInputStart = out.find(c => c.type === "tool-input-start") as any; + const toolInputEnd = out.find(c => c.type === "tool-input-end") as any; + const toolCall = out.find(c => c.type === "tool-call") as any; + + // Verify tool-input-start event + expect(toolInputStart).toBeTruthy(); + expect(toolInputStart.type).toBe("tool-input-start"); + expect(toolInputStart.id).toBe("mock-id"); + expect(toolInputStart.toolName).toBe("get_weather"); + + // Verify tool-input-end event + expect(toolInputEnd).toBeTruthy(); + expect(toolInputEnd.type).toBe("tool-input-end"); + expect(toolInputEnd.id).toBe("mock-id"); + + // Verify tool-call event uses same ID + expect(toolCall).toBeTruthy(); + expect(toolCall.toolCallId).toBe("mock-id"); + expect(toolCall.toolName).toBe("get_weather"); + + // Verify event order: start -> end -> tool-call + const eventIndexes = { + start: out.findIndex(c => c.type === "tool-input-start"), + end: out.findIndex(c => c.type === "tool-input-end"), + call: out.findIndex(c => c.type === "tool-call"), + }; + expect(eventIndexes.start).toBeLessThan(eventIndexes.end); + expect(eventIndexes.end).toBeLessThan(eventIndexes.call); + }); + + it("emits tool-input-start and tool-input-end events for failed tool call", async () => { + const onError = vi.fn(); + const protocol = morphXmlProtocol(); + const transformer = protocol.createStreamParser({ + tools, + options: { onError }, + }); + + const rs = new ReadableStream({ + start(ctrl) { + ctrl.enqueue({ type: "text-delta", id: "1", delta: "" }); + ctrl.enqueue({ + type: "text-delta", + id: "1", + delta: "malformed xml", + }); + ctrl.enqueue({ type: "text-delta", id: "1", delta: "" }); + ctrl.enqueue({ + type: "finish", + finishReason: "stop", + usage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, + }); + ctrl.close(); + }, + }); + + const out = await collect(rs.pipeThrough(transformer)); + + // Find tool-input events + const toolInputStart = out.find(c => c.type === "tool-input-start") as any; + const toolInputEnd = out.find(c => c.type === "tool-input-end") as any; + + // Verify both events are emitted even on error + expect(toolInputStart).toBeTruthy(); + expect(toolInputStart.type).toBe("tool-input-start"); + expect(toolInputStart.id).toBe("mock-id"); + expect(toolInputStart.toolName).toBe("get_weather"); + + expect(toolInputEnd).toBeTruthy(); + expect(toolInputEnd.type).toBe("tool-input-end"); + expect(toolInputEnd.id).toBe("mock-id"); + + // Verify error callback was called + expect(onError).toHaveBeenCalled(); + }); + + it("emits tool-input-start and tool-input-end for incomplete tool call at stream end", async () => { + const protocol = morphXmlProtocol(); + const transformer = protocol.createStreamParser({ tools }); + + const rs = new ReadableStream({ + start(ctrl) { + ctrl.enqueue({ type: "text-delta", id: "1", delta: "" }); + ctrl.enqueue({ + type: "text-delta", + id: "1", + delta: "NY", + }); + // Note: no closing tag - incomplete tool call + ctrl.enqueue({ + type: "finish", + finishReason: "stop", + usage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, + }); + ctrl.close(); + }, + }); + + const out = await collect(rs.pipeThrough(transformer)); + + // Find tool-input events + const toolInputStart = out.find(c => c.type === "tool-input-start") as any; + const toolInputEnd = out.find(c => c.type === "tool-input-end") as any; + + // Verify both events are emitted even for incomplete calls + expect(toolInputStart).toBeTruthy(); + expect(toolInputStart.type).toBe("tool-input-start"); + expect(toolInputStart.id).toBe("mock-id"); + expect(toolInputStart.toolName).toBe("get_weather"); + + expect(toolInputEnd).toBeTruthy(); + expect(toolInputEnd.type).toBe("tool-input-end"); + expect(toolInputEnd.id).toBe("mock-id"); + + // Verify the incomplete call content is emitted as text + const textParts = out + .filter(c => c.type === "text-delta") + .map((c: any) => c.delta); + const fullText = textParts.join(""); + expect(fullText).toContain(""); + expect(fullText).toContain("NY"); + }); + + it("emits multiple paired tool-input events for multiple tool calls", async () => { + const protocol = morphXmlProtocol(); + const transformer = protocol.createStreamParser({ tools }); + + const rs = new ReadableStream({ + start(ctrl) { + ctrl.enqueue({ + type: "text-delta", + id: "1", + delta: "NY", + }); + ctrl.enqueue({ type: "text-delta", id: "1", delta: " and " }); + ctrl.enqueue({ + type: "text-delta", + id: "1", + delta: "SF", + }); + ctrl.enqueue({ + type: "finish", + finishReason: "stop", + usage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, + }); + ctrl.close(); + }, + }); + + const out = await collect(rs.pipeThrough(transformer)); + + // Find all tool-input events + const toolInputStarts = out.filter(c => c.type === "tool-input-start"); + const toolInputEnds = out.filter(c => c.type === "tool-input-end"); + const toolCalls = out.filter(c => c.type === "tool-call"); + + // Should have at least 1 tool call (implementation may coalesce) + expect(toolInputStarts.length).toBeGreaterThanOrEqual(1); + expect(toolInputEnds.length).toBeGreaterThanOrEqual(1); + expect(toolCalls.length).toBeGreaterThanOrEqual(1); + + // Each start should have a corresponding end + expect(toolInputStarts.length).toBe(toolInputEnds.length); + expect(toolInputStarts.length).toBe(toolCalls.length); + + // Verify IDs match between start, end, and tool-call events + for (let i = 0; i < toolInputStarts.length; i++) { + const start = toolInputStarts[i] as any; + const end = toolInputEnds[i] as any; + const call = toolCalls[i] as any; + + expect(start.id).toBe(end.id); + expect(start.id).toBe(call.toolCallId); + expect(start.toolName).toBe("get_weather"); + } + }); +});