From bc8e675e2b8284da2bde32c636d2cdddad963e5b Mon Sep 17 00:00:00 2001 From: Amit Saroj Date: Wed, 3 Jun 2026 22:36:46 +0530 Subject: [PATCH 1/3] fix(ai): normalize AG-UI snapshot messages to UIMessage[] in MESSAGES_SNAPSHOT handler --- .../ai/src/activities/chat/stream/processor.ts | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/packages/ai/src/activities/chat/stream/processor.ts b/packages/ai/src/activities/chat/stream/processor.ts index 78c93fdb0..546e2ccd3 100644 --- a/packages/ai/src/activities/chat/stream/processor.ts +++ b/packages/ai/src/activities/chat/stream/processor.ts @@ -17,7 +17,7 @@ * @see docs/chat-architecture.md — Canonical reference for AG-UI chunk ordering, * adapter contract, single-shot flows, and expected UIMessage output. */ -import { generateMessageId, uiMessageToModelMessages } from '../messages.js' +import { generateMessageId, modelMessageToUIMessage, uiMessageToModelMessages } from '../messages.js' import { normalizeToolResult } from '../../../utilities/tool-result' import { defaultJSONParser } from './json-parser' import { @@ -870,8 +870,16 @@ export class StreamProcessor { chunk: Extract, ): void { this.resetStreamState() - // AG-UI Message[] is compatible with UIMessage[] at runtime - this.messages = [...chunk.messages] as Array + // Normalize AG-UI messages to UIMessage[] to ensure each message has + // a `parts` array. AG-UI snapshot messages carry `content` but no + // `parts`, so casting them directly as UIMessage[] is unsafe and causes + // "Cannot read properties of undefined (reading 'find')" when any code + // later accesses message.parts (e.g. onToolCallStateChange devtools handler). + this.messages = chunk.messages.map((msg) => + 'parts' in msg + ? (msg as UIMessage) + : modelMessageToUIMessage(msg as any, generateMessageId()), + ) this.emitMessagesChange() } From b091adc7e871329fdef2ac37443199072c309b6b Mon Sep 17 00:00:00 2001 From: Amit Saroj Date: Wed, 3 Jun 2026 22:52:38 +0530 Subject: [PATCH 2/3] fix(ai): preserve original msg.id in MESSAGES_SNAPSHOT normalization --- packages/ai/src/activities/chat/stream/processor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/ai/src/activities/chat/stream/processor.ts b/packages/ai/src/activities/chat/stream/processor.ts index 546e2ccd3..9ec82e2e9 100644 --- a/packages/ai/src/activities/chat/stream/processor.ts +++ b/packages/ai/src/activities/chat/stream/processor.ts @@ -878,7 +878,7 @@ export class StreamProcessor { this.messages = chunk.messages.map((msg) => 'parts' in msg ? (msg as UIMessage) - : modelMessageToUIMessage(msg as any, generateMessageId()), + : modelMessageToUIMessage(msg as any, (msg as { id?: string }).id ?? generateMessageId()), ) this.emitMessagesChange() } From 99c4a8bea01fe2ab8619c550f60650dccec76f76 Mon Sep 17 00:00:00 2001 From: Amit Saroj Date: Wed, 10 Jun 2026 13:57:37 +0530 Subject: [PATCH 3/3] test(ai): add MESSAGES_SNAPSHOT normalization tests for AG-UI messages --- packages/ai/tests/stream-processor.test.ts | 189 +++++++++++++++++++++ 1 file changed, 189 insertions(+) diff --git a/packages/ai/tests/stream-processor.test.ts b/packages/ai/tests/stream-processor.test.ts index bc6580520..65bad3070 100644 --- a/packages/ai/tests/stream-processor.test.ts +++ b/packages/ai/tests/stream-processor.test.ts @@ -2718,6 +2718,195 @@ describe('StreamProcessor', () => { expect(onStreamEnd).toHaveBeenCalledTimes(1) }) + + it('should normalize AG-UI messages without parts to UIMessage[] with parts array', () => { + // Regression: AG-UI MESSAGES_SNAPSHOT messages have shape { id, role, content } + // and lack the `parts` array that UIMessage requires. The old unsafe cast + // `as Array` masked this at compile time, causing + // "TypeError: Cannot read properties of undefined (reading 'find')" + // when any downstream code accessed message.parts. + const processor = new StreamProcessor() + + processor.processChunk({ + type: EventType.MESSAGES_SNAPSHOT, + messages: [ + { id: 'ag-1', role: 'user', content: 'Hello' }, + { id: 'ag-2', role: 'assistant', content: 'Hi there!' }, + ] as any, + timestamp: Date.now(), + }) + + const messages = processor.getMessages() + expect(messages).toHaveLength(2) + + // Every normalized message must have a `parts` array (never undefined) + for (const msg of messages) { + expect(Array.isArray(msg.parts)).toBe(true) + } + + // Text content should be surfaced as a TextPart + const userMsg = messages.find((m) => m.id === 'ag-1')! + const userTextPart = userMsg.parts.find((p) => p.type === 'text') + expect(userTextPart).toBeDefined() + expect((userTextPart as any).content).toBe('Hello') + + const assistantMsg = messages.find((m) => m.id === 'ag-2')! + const assistantTextPart = assistantMsg.parts.find((p) => p.type === 'text') + expect(assistantTextPart).toBeDefined() + expect((assistantTextPart as any).content).toBe('Hi there!') + }) + + it('should preserve original AG-UI message id after normalization', () => { + // Regression: an earlier version of the fix always called generateMessageId() + // which discarded the original AG-UI id and broke downstream event correlation + // (TEXT_MESSAGE_CONTENT, TOOL_CALL_*, ensureAssistantMessage). + const processor = new StreamProcessor() + + processor.processChunk({ + type: EventType.MESSAGES_SNAPSHOT, + messages: [ + { id: 'original-id-1', role: 'user', content: 'ping' }, + { id: 'original-id-2', role: 'assistant', content: 'pong' }, + ] as any, + timestamp: Date.now(), + }) + + const messages = processor.getMessages() + expect(messages[0]?.id).toBe('original-id-1') + expect(messages[1]?.id).toBe('original-id-2') + }) + + it('should not throw when accessing parts.find() on snapshot-normalized messages', () => { + // Directly reproduces the TypeError: Cannot read properties of undefined + // (reading 'find') crash that occurred in the onToolCallStateChange devtools + // handler in chat-client.ts after a MESSAGES_SNAPSHOT reset. + const onToolCallStateChange = vi.fn() + const processor = new StreamProcessor({ + events: { onToolCallStateChange }, + }) + + processor.processChunk({ + type: EventType.MESSAGES_SNAPSHOT, + messages: [ + { id: 'snap-user', role: 'user', content: 'Use the weather tool' }, + { id: 'snap-asst', role: 'assistant', content: '' }, + ] as any, + timestamp: Date.now(), + }) + + // Any subsequent event that calls message.parts.find() must not throw + expect(() => { + processor.processChunk({ + type: 'TOOL_CALL_START', + toolCallId: 'tc-1', + toolCallName: 'getWeather', + toolName: 'getWeather', + parentMessageId: 'snap-asst', + timestamp: Date.now(), + } as any) + }).not.toThrow() + }) + + it('should pass through messages that already have a parts array without re-normalizing', () => { + // UIMessages arriving in the snapshot (already UIMessage shape with parts) + // must be kept as-is; only raw AG-UI messages (no parts) need conversion. + const processor = new StreamProcessor() + + const existingUIMessages: Array = [ + { + id: 'ui-1', + role: 'user', + parts: [{ type: 'text', content: 'Already a UIMessage' }], + }, + { + id: 'ui-2', + role: 'assistant', + parts: [ + { type: 'text', content: 'With a tool call' }, + { + type: 'tool-call', + id: 'tc-existing', + name: 'myTool', + arguments: '{}', + state: 'complete', + } as any, + ], + }, + ] + + processor.processChunk({ + type: EventType.MESSAGES_SNAPSHOT, + messages: existingUIMessages, + timestamp: Date.now(), + }) + + const messages = processor.getMessages() + expect(messages).toHaveLength(2) + expect(messages[0]?.parts).toHaveLength(1) + expect(messages[0]?.parts[0]).toEqual({ + type: 'text', + content: 'Already a UIMessage', + }) + expect(messages[1]?.parts).toHaveLength(2) + expect((messages[1]?.parts[1] as any)?.id).toBe('tc-existing') + }) + + it('should generate a fallback id when AG-UI message has no id field', () => { + // Edge case: some AG-UI backends may omit the id field. + // The fix must fall back to generateMessageId() so the resulting + // UIMessage always has a valid non-empty id string. + const processor = new StreamProcessor() + + processor.processChunk({ + type: EventType.MESSAGES_SNAPSHOT, + messages: [{ role: 'user', content: 'No id here' }] as any, + timestamp: Date.now(), + }) + + const messages = processor.getMessages() + expect(messages).toHaveLength(1) + expect(typeof messages[0]?.id).toBe('string') + expect(messages[0]!.id.length).toBeGreaterThan(0) + }) + + it('should handle empty messages array without errors', () => { + const processor = new StreamProcessor() + processor.addUserMessage('hello') + + processor.processChunk({ + type: EventType.MESSAGES_SNAPSHOT, + messages: [], + timestamp: Date.now(), + }) + + expect(processor.getMessages()).toHaveLength(0) + }) + + it('should correctly route TEXT_MESSAGE_CONTENT after a snapshot with raw AG-UI messages', () => { + // After a MESSAGES_SNAPSHOT the stream may continue sending content + // for messages that were in the snapshot. Ensure that normalization + // does not break subsequent event routing via messageId. + const processor = new StreamProcessor() + + processor.processChunk({ + type: EventType.MESSAGES_SNAPSHOT, + messages: [ + { id: 'snap-user', role: 'user', content: 'Tell me a story' }, + { id: 'snap-asst', role: 'assistant', content: 'Once upon a ' }, + ] as any, + timestamp: Date.now(), + }) + + processor.processChunk(ev.textContent('time...', 'snap-asst')) + processor.processChunk(ev.runFinished('stop')) + processor.finalizeStream() + + const messages = processor.getMessages() + const asst = messages.find((m) => m.id === 'snap-asst')! + expect(asst).toBeDefined() + const textPart = asst.parts.find((p) => p.type === 'text') as any + expect(textPart?.content).toContain('time...') + }) }) describe('MESSAGES_SNAPSHOT resets transient state', () => {