Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions packages/ai/src/activities/chat/stream/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* @see docs/chat-architecture.md — Canonical reference for AG-UI chunk ordering,
* adapter contract, single-shot flows, and expected UIMessage output.
*/
import { generateMessageId, uiMessageToModelMessages } from '../messages.js'
import { generateMessageId, modelMessageToUIMessage, uiMessageToModelMessages } from '../messages.js'
import { normalizeToolResult } from '../../../utilities/tool-result'
import { defaultJSONParser } from './json-parser'
import {
Expand Down Expand Up @@ -870,8 +870,16 @@ export class StreamProcessor {
chunk: Extract<StreamChunk, { type: 'MESSAGES_SNAPSHOT' }>,
): void {
this.resetStreamState()
// AG-UI Message[] is compatible with UIMessage[] at runtime
this.messages = [...chunk.messages] as Array<UIMessage>
// Normalize AG-UI messages to UIMessage[] to ensure each message has
// a `parts` array. AG-UI snapshot messages carry `content` but no
// `parts`, so casting them directly as UIMessage[] is unsafe and causes
// "Cannot read properties of undefined (reading 'find')" when any code
// later accesses message.parts (e.g. onToolCallStateChange devtools handler).
this.messages = chunk.messages.map((msg) =>
'parts' in msg
? (msg as UIMessage)
: modelMessageToUIMessage(msg as any, (msg as { id?: string }).id ?? generateMessageId()),
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
this.emitMessagesChange()
}

Expand Down
189 changes: 189 additions & 0 deletions packages/ai/tests/stream-processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2718,6 +2718,195 @@ describe('StreamProcessor', () => {

expect(onStreamEnd).toHaveBeenCalledTimes(1)
})

it('should normalize AG-UI messages without parts to UIMessage[] with parts array', () => {
// Regression: AG-UI MESSAGES_SNAPSHOT messages have shape { id, role, content }
// and lack the `parts` array that UIMessage requires. The old unsafe cast
// `as Array<UIMessage>` masked this at compile time, causing
// "TypeError: Cannot read properties of undefined (reading 'find')"
// when any downstream code accessed message.parts.
const processor = new StreamProcessor()

processor.processChunk({
type: EventType.MESSAGES_SNAPSHOT,
messages: [
{ id: 'ag-1', role: 'user', content: 'Hello' },
{ id: 'ag-2', role: 'assistant', content: 'Hi there!' },
] as any,
timestamp: Date.now(),
})

const messages = processor.getMessages()
expect(messages).toHaveLength(2)

// Every normalized message must have a `parts` array (never undefined)
for (const msg of messages) {
expect(Array.isArray(msg.parts)).toBe(true)
}

// Text content should be surfaced as a TextPart
const userMsg = messages.find((m) => m.id === 'ag-1')!
const userTextPart = userMsg.parts.find((p) => p.type === 'text')
expect(userTextPart).toBeDefined()
expect((userTextPart as any).content).toBe('Hello')

const assistantMsg = messages.find((m) => m.id === 'ag-2')!
const assistantTextPart = assistantMsg.parts.find((p) => p.type === 'text')
expect(assistantTextPart).toBeDefined()
expect((assistantTextPart as any).content).toBe('Hi there!')
})

it('should preserve original AG-UI message id after normalization', () => {
// Regression: an earlier version of the fix always called generateMessageId()
// which discarded the original AG-UI id and broke downstream event correlation
// (TEXT_MESSAGE_CONTENT, TOOL_CALL_*, ensureAssistantMessage).
const processor = new StreamProcessor()

processor.processChunk({
type: EventType.MESSAGES_SNAPSHOT,
messages: [
{ id: 'original-id-1', role: 'user', content: 'ping' },
{ id: 'original-id-2', role: 'assistant', content: 'pong' },
] as any,
timestamp: Date.now(),
})

const messages = processor.getMessages()
expect(messages[0]?.id).toBe('original-id-1')
expect(messages[1]?.id).toBe('original-id-2')
})

it('should not throw when accessing parts.find() on snapshot-normalized messages', () => {
// Directly reproduces the TypeError: Cannot read properties of undefined
// (reading 'find') crash that occurred in the onToolCallStateChange devtools
// handler in chat-client.ts after a MESSAGES_SNAPSHOT reset.
const onToolCallStateChange = vi.fn()
const processor = new StreamProcessor({
events: { onToolCallStateChange },
})

processor.processChunk({
type: EventType.MESSAGES_SNAPSHOT,
messages: [
{ id: 'snap-user', role: 'user', content: 'Use the weather tool' },
{ id: 'snap-asst', role: 'assistant', content: '' },
] as any,
timestamp: Date.now(),
})

// Any subsequent event that calls message.parts.find() must not throw
expect(() => {
processor.processChunk({
type: 'TOOL_CALL_START',
toolCallId: 'tc-1',
toolCallName: 'getWeather',
toolName: 'getWeather',
parentMessageId: 'snap-asst',
timestamp: Date.now(),
} as any)
}).not.toThrow()
})

it('should pass through messages that already have a parts array without re-normalizing', () => {
// UIMessages arriving in the snapshot (already UIMessage shape with parts)
// must be kept as-is; only raw AG-UI messages (no parts) need conversion.
const processor = new StreamProcessor()

const existingUIMessages: Array<UIMessage> = [
{
id: 'ui-1',
role: 'user',
parts: [{ type: 'text', content: 'Already a UIMessage' }],
},
{
id: 'ui-2',
role: 'assistant',
parts: [
{ type: 'text', content: 'With a tool call' },
{
type: 'tool-call',
id: 'tc-existing',
name: 'myTool',
arguments: '{}',
state: 'complete',
} as any,
],
},
]

processor.processChunk({
type: EventType.MESSAGES_SNAPSHOT,
messages: existingUIMessages,
timestamp: Date.now(),
})

const messages = processor.getMessages()
expect(messages).toHaveLength(2)
expect(messages[0]?.parts).toHaveLength(1)
expect(messages[0]?.parts[0]).toEqual({
type: 'text',
content: 'Already a UIMessage',
})
expect(messages[1]?.parts).toHaveLength(2)
expect((messages[1]?.parts[1] as any)?.id).toBe('tc-existing')
})

it('should generate a fallback id when AG-UI message has no id field', () => {
// Edge case: some AG-UI backends may omit the id field.
// The fix must fall back to generateMessageId() so the resulting
// UIMessage always has a valid non-empty id string.
const processor = new StreamProcessor()

processor.processChunk({
type: EventType.MESSAGES_SNAPSHOT,
messages: [{ role: 'user', content: 'No id here' }] as any,
timestamp: Date.now(),
})

const messages = processor.getMessages()
expect(messages).toHaveLength(1)
expect(typeof messages[0]?.id).toBe('string')
expect(messages[0]!.id.length).toBeGreaterThan(0)
})

it('should handle empty messages array without errors', () => {
const processor = new StreamProcessor()
processor.addUserMessage('hello')

processor.processChunk({
type: EventType.MESSAGES_SNAPSHOT,
messages: [],
timestamp: Date.now(),
})

expect(processor.getMessages()).toHaveLength(0)
})

it('should correctly route TEXT_MESSAGE_CONTENT after a snapshot with raw AG-UI messages', () => {
// After a MESSAGES_SNAPSHOT the stream may continue sending content
// for messages that were in the snapshot. Ensure that normalization
// does not break subsequent event routing via messageId.
const processor = new StreamProcessor()

processor.processChunk({
type: EventType.MESSAGES_SNAPSHOT,
messages: [
{ id: 'snap-user', role: 'user', content: 'Tell me a story' },
{ id: 'snap-asst', role: 'assistant', content: 'Once upon a ' },
] as any,
timestamp: Date.now(),
})

processor.processChunk(ev.textContent('time...', 'snap-asst'))
processor.processChunk(ev.runFinished('stop'))
processor.finalizeStream()

const messages = processor.getMessages()
const asst = messages.find((m) => m.id === 'snap-asst')!
expect(asst).toBeDefined()
const textPart = asst.parts.find((p) => p.type === 'text') as any
expect(textPart?.content).toContain('time...')
})
})

describe('MESSAGES_SNAPSHOT resets transient state', () => {
Expand Down