Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 84 additions & 97 deletions apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
type ExecutionContext,
getNextExecutionOrder,
type NormalizedBlockOutput,
type StreamingExecution,
} from '@/executor/types'
import { streamingResponseFormatProcessor } from '@/executor/utils'
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
Expand Down Expand Up @@ -140,7 +141,7 @@ export class BlockExecutor {

let normalizedOutput: NormalizedBlockOutput
if (isStreamingExecution) {
const streamingExec = output as { stream: ReadableStream; execution: any }
const streamingExec = output as StreamingExecution

if (ctx.onStream) {
await this.handleStreamingExecution(
Expand Down Expand Up @@ -602,7 +603,7 @@ export class BlockExecutor {
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
streamingExec: { stream: ReadableStream; execution: any },
streamingExec: StreamingExecution,
resolvedInputs: Record<string, any>,
selectedOutputs: string[]
): Promise<void> {
Expand All @@ -613,129 +614,115 @@ export class BlockExecutor {
(block.config?.params as Record<string, any> | undefined)?.responseFormat ??
(block.config as Record<string, any> | undefined)?.responseFormat

const stream = streamingExec.stream
if (typeof stream.tee !== 'function') {
await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs)
return
}
const sourceReader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
const accumulated: string[] = []
let drainError: unknown
let sourceFullyDrained = false

const [clientStream, executorStream] = stream.tee()
const clientSource = new ReadableStream<Uint8Array>({
async pull(controller) {
try {
const { done, value } = await sourceReader.read()
if (done) {
const tail = decoder.decode()
if (tail) accumulated.push(tail)
sourceFullyDrained = true
controller.close()
return
}
accumulated.push(decoder.decode(value, { stream: true }))
controller.enqueue(value)
} catch (error) {
drainError = error
controller.error(error)
}
},
async cancel(reason) {
try {
await sourceReader.cancel(reason)
} catch {}
},
})

const processedClientStream = streamingResponseFormatProcessor.processStream(
clientStream,
blockId,
selectedOutputs,
responseFormat
)

const clientStreamingExec = {
...streamingExec,
stream: processedClientStream,
}

const executorConsumption = this.consumeExecutorStream(
executorStream,
streamingExec,
blockId,
responseFormat
)

const clientConsumption = (async () => {
try {
await ctx.onStream?.(clientStreamingExec)
} catch (error) {
this.execLogger.error('Error in onStream callback', { blockId, error })
// Cancel the client stream to release the tee'd buffer
await processedClientStream.cancel().catch(() => {})
}
})()

await Promise.all([clientConsumption, executorConsumption])
}

private async forwardStream(
ctx: ExecutionContext,
blockId: string,
streamingExec: { stream: ReadableStream; execution: any },
stream: ReadableStream,
responseFormat: any,
selectedOutputs: string[]
): Promise<void> {
const processedStream = streamingResponseFormatProcessor.processStream(
stream,
clientSource,
blockId,
selectedOutputs,
responseFormat
)

try {
await ctx.onStream?.({
...streamingExec,
stream: processedStream,
stream: processedClientStream,
execution: streamingExec.execution,
})
} catch (error) {
this.execLogger.error('Error in onStream callback', { blockId, error })
await processedStream.cancel().catch(() => {})
}
}

private async consumeExecutorStream(
stream: ReadableStream,
streamingExec: { execution: any },
blockId: string,
responseFormat: any
): Promise<void> {
const reader = stream.getReader()
const decoder = new TextDecoder()
const chunks: string[] = []

try {
while (true) {
const { done, value } = await reader.read()
if (done) break
chunks.push(decoder.decode(value, { stream: true }))
}
const tail = decoder.decode()
if (tail) chunks.push(tail)
} catch (error) {
this.execLogger.error('Error reading executor stream for block', { blockId, error })
await processedClientStream.cancel().catch(() => {})
} finally {
try {
await reader.cancel().catch(() => {})
sourceReader.releaseLock()
} catch {}
}

const fullContent = chunks.join('')
Comment thread
TheodoreSpeaks marked this conversation as resolved.
if (drainError) {
this.execLogger.error('Error reading stream for block', { blockId, error: drainError })
return
}

// If the onStream consumer exited before the source drained (e.g. it caught
// an internal error and returned normally), `accumulated` holds a truncated
// response. Persisting that to memory or setting it as the block output
// would corrupt downstream state — skip and log instead.
if (!sourceFullyDrained) {
this.execLogger.warn(
'Stream consumer exited before source drained; skipping content persistence',
{
blockId,
}
)
return
}

const fullContent = accumulated.join('')
if (!fullContent) {
return
}

const executionOutput = streamingExec.execution?.output
if (!executionOutput || typeof executionOutput !== 'object') {
return
if (executionOutput && typeof executionOutput === 'object') {
let parsedForFormat = false
if (responseFormat) {
try {
const parsed = JSON.parse(fullContent.trim())
streamingExec.execution.output = {
...parsed,
tokens: executionOutput.tokens,
toolCalls: executionOutput.toolCalls,
providerTiming: executionOutput.providerTiming,
cost: executionOutput.cost,
model: executionOutput.model,
}
parsedForFormat = true
} catch (error) {
this.execLogger.warn('Failed to parse streamed content for response format', {
blockId,
error,
})
}
}
if (!parsedForFormat) {
executionOutput.content = fullContent
}
}

if (responseFormat) {
if (streamingExec.onFullContent) {
try {
const parsed = JSON.parse(fullContent.trim())

streamingExec.execution.output = {
...parsed,
tokens: executionOutput.tokens,
toolCalls: executionOutput.toolCalls,
providerTiming: executionOutput.providerTiming,
cost: executionOutput.cost,
model: executionOutput.model,
}
return
await streamingExec.onFullContent(fullContent)
} catch (error) {
this.execLogger.warn('Failed to parse streamed content for response format', {
blockId,
error,
})
this.execLogger.error('onFullContent callback failed', { blockId, error })
}
}
Comment on lines +720 to +726
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 onFullContent called unconditionally with empty string

onFullContent is invoked regardless of whether fullContent is empty, while the preceding block that sets executionOutput.content is guarded by if (fullContent). The current agent handler defensively checks !content.trim(), but any future onFullContent implementor might not, leading to spurious empty-content persistence calls.

Suggested change
if (streamingExec.onFullContent) {
try {
await streamingExec.onFullContent(fullContent)
} catch (error) {
this.execLogger.error('onFullContent callback failed', { blockId, error })
}
}
if (streamingExec.onFullContent && fullContent) {
try {
await streamingExec.onFullContent(fullContent)
} catch (error) {
this.execLogger.error('onFullContent callback failed', { blockId, error })
}
}


executionOutput.content = fullContent
}
}
10 changes: 9 additions & 1 deletion apps/sim/executor/handlers/agent/agent-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -958,8 +958,16 @@ export class AgentBlockHandler implements BlockHandler {
streamingExec: StreamingExecution
): StreamingExecution {
return {
stream: memoryService.wrapStreamForPersistence(streamingExec.stream, ctx, inputs),
stream: streamingExec.stream,
execution: streamingExec.execution,
onFullContent: async (content: string) => {
if (!content.trim()) return
try {
await memoryService.appendToMemory(ctx, inputs, { role: 'assistant', content })
} catch (error) {
logger.error('Failed to persist streaming response:', error)
}
},
}
}

Expand Down
29 changes: 0 additions & 29 deletions apps/sim/executor/handlers/agent/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,35 +111,6 @@ export class Memory {
})
}

wrapStreamForPersistence(
stream: ReadableStream<Uint8Array>,
ctx: ExecutionContext,
inputs: AgentInputs
): ReadableStream<Uint8Array> {
const chunks: string[] = []
const decoder = new TextDecoder()

const transformStream = new TransformStream<Uint8Array, Uint8Array>({
transform: (chunk, controller) => {
controller.enqueue(chunk)
const decoded = decoder.decode(chunk, { stream: true })
chunks.push(decoded)
},

flush: () => {
const content = chunks.join('')
if (content.trim()) {
this.appendToMemory(ctx, inputs, {
role: 'assistant',
content,
}).catch((error) => logger.error('Failed to persist streaming response:', error))
}
},
})

return stream.pipeThrough(transformStream)
}

private requireWorkspaceId(ctx: ExecutionContext): string {
if (!ctx.workspaceId) {
throw new Error('workspaceId is required for memory operations')
Expand Down
6 changes: 6 additions & 0 deletions apps/sim/executor/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,12 @@ export interface ExecutionResult {
export interface StreamingExecution {
stream: ReadableStream
execution: ExecutionResult & { isStreaming?: boolean }
/**
* Invoked with the assembled response text after the stream drains. Lets agent
* blocks persist the full response without interposing a TransformStream on a
* fetch-backed source — that pattern amplifies memory on Bun via #28035.
*/
onFullContent?: (content: string) => void | Promise<void>
}

export interface BlockExecutor {
Expand Down
Loading