-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Avoid bun memory leak bug from TransformStream #4255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging
Are you sure you want to change the base?
Changes from all commits
43a727d
c29447a
bf2f9af
c6272d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -34,6 +34,7 @@ import { | |||||||||||||||||||||||||||||
| type ExecutionContext, | ||||||||||||||||||||||||||||||
| getNextExecutionOrder, | ||||||||||||||||||||||||||||||
| type NormalizedBlockOutput, | ||||||||||||||||||||||||||||||
| type StreamingExecution, | ||||||||||||||||||||||||||||||
| } from '@/executor/types' | ||||||||||||||||||||||||||||||
| import { streamingResponseFormatProcessor } from '@/executor/utils' | ||||||||||||||||||||||||||||||
| import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors' | ||||||||||||||||||||||||||||||
|
|
@@ -140,7 +141,7 @@ export class BlockExecutor { | |||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| let normalizedOutput: NormalizedBlockOutput | ||||||||||||||||||||||||||||||
| if (isStreamingExecution) { | ||||||||||||||||||||||||||||||
| const streamingExec = output as { stream: ReadableStream; execution: any } | ||||||||||||||||||||||||||||||
| const streamingExec = output as StreamingExecution | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if (ctx.onStream) { | ||||||||||||||||||||||||||||||
| await this.handleStreamingExecution( | ||||||||||||||||||||||||||||||
|
|
@@ -602,7 +603,7 @@ export class BlockExecutor { | |||||||||||||||||||||||||||||
| ctx: ExecutionContext, | ||||||||||||||||||||||||||||||
| node: DAGNode, | ||||||||||||||||||||||||||||||
| block: SerializedBlock, | ||||||||||||||||||||||||||||||
| streamingExec: { stream: ReadableStream; execution: any }, | ||||||||||||||||||||||||||||||
| streamingExec: StreamingExecution, | ||||||||||||||||||||||||||||||
| resolvedInputs: Record<string, any>, | ||||||||||||||||||||||||||||||
| selectedOutputs: string[] | ||||||||||||||||||||||||||||||
| ): Promise<void> { | ||||||||||||||||||||||||||||||
|
|
@@ -613,129 +614,115 @@ export class BlockExecutor { | |||||||||||||||||||||||||||||
| (block.config?.params as Record<string, any> | undefined)?.responseFormat ?? | ||||||||||||||||||||||||||||||
| (block.config as Record<string, any> | undefined)?.responseFormat | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| const stream = streamingExec.stream | ||||||||||||||||||||||||||||||
| if (typeof stream.tee !== 'function') { | ||||||||||||||||||||||||||||||
| await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs) | ||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| const sourceReader = streamingExec.stream.getReader() | ||||||||||||||||||||||||||||||
| const decoder = new TextDecoder() | ||||||||||||||||||||||||||||||
| const accumulated: string[] = [] | ||||||||||||||||||||||||||||||
| let drainError: unknown | ||||||||||||||||||||||||||||||
| let sourceFullyDrained = false | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| const [clientStream, executorStream] = stream.tee() | ||||||||||||||||||||||||||||||
| const clientSource = new ReadableStream<Uint8Array>({ | ||||||||||||||||||||||||||||||
| async pull(controller) { | ||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||
| const { done, value } = await sourceReader.read() | ||||||||||||||||||||||||||||||
| if (done) { | ||||||||||||||||||||||||||||||
| const tail = decoder.decode() | ||||||||||||||||||||||||||||||
| if (tail) accumulated.push(tail) | ||||||||||||||||||||||||||||||
| sourceFullyDrained = true | ||||||||||||||||||||||||||||||
| controller.close() | ||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| accumulated.push(decoder.decode(value, { stream: true })) | ||||||||||||||||||||||||||||||
| controller.enqueue(value) | ||||||||||||||||||||||||||||||
| } catch (error) { | ||||||||||||||||||||||||||||||
| drainError = error | ||||||||||||||||||||||||||||||
| controller.error(error) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||
| async cancel(reason) { | ||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||
| await sourceReader.cancel(reason) | ||||||||||||||||||||||||||||||
| } catch {} | ||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| const processedClientStream = streamingResponseFormatProcessor.processStream( | ||||||||||||||||||||||||||||||
| clientStream, | ||||||||||||||||||||||||||||||
| blockId, | ||||||||||||||||||||||||||||||
| selectedOutputs, | ||||||||||||||||||||||||||||||
| responseFormat | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| const clientStreamingExec = { | ||||||||||||||||||||||||||||||
| ...streamingExec, | ||||||||||||||||||||||||||||||
| stream: processedClientStream, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| const executorConsumption = this.consumeExecutorStream( | ||||||||||||||||||||||||||||||
| executorStream, | ||||||||||||||||||||||||||||||
| streamingExec, | ||||||||||||||||||||||||||||||
| blockId, | ||||||||||||||||||||||||||||||
| responseFormat | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| const clientConsumption = (async () => { | ||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||
| await ctx.onStream?.(clientStreamingExec) | ||||||||||||||||||||||||||||||
| } catch (error) { | ||||||||||||||||||||||||||||||
| this.execLogger.error('Error in onStream callback', { blockId, error }) | ||||||||||||||||||||||||||||||
| // Cancel the client stream to release the tee'd buffer | ||||||||||||||||||||||||||||||
| await processedClientStream.cancel().catch(() => {}) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| })() | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| await Promise.all([clientConsumption, executorConsumption]) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| private async forwardStream( | ||||||||||||||||||||||||||||||
| ctx: ExecutionContext, | ||||||||||||||||||||||||||||||
| blockId: string, | ||||||||||||||||||||||||||||||
| streamingExec: { stream: ReadableStream; execution: any }, | ||||||||||||||||||||||||||||||
| stream: ReadableStream, | ||||||||||||||||||||||||||||||
| responseFormat: any, | ||||||||||||||||||||||||||||||
| selectedOutputs: string[] | ||||||||||||||||||||||||||||||
| ): Promise<void> { | ||||||||||||||||||||||||||||||
| const processedStream = streamingResponseFormatProcessor.processStream( | ||||||||||||||||||||||||||||||
| stream, | ||||||||||||||||||||||||||||||
| clientSource, | ||||||||||||||||||||||||||||||
| blockId, | ||||||||||||||||||||||||||||||
| selectedOutputs, | ||||||||||||||||||||||||||||||
| responseFormat | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||
| await ctx.onStream?.({ | ||||||||||||||||||||||||||||||
| ...streamingExec, | ||||||||||||||||||||||||||||||
| stream: processedStream, | ||||||||||||||||||||||||||||||
| stream: processedClientStream, | ||||||||||||||||||||||||||||||
| execution: streamingExec.execution, | ||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||
| } catch (error) { | ||||||||||||||||||||||||||||||
| this.execLogger.error('Error in onStream callback', { blockId, error }) | ||||||||||||||||||||||||||||||
| await processedStream.cancel().catch(() => {}) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| private async consumeExecutorStream( | ||||||||||||||||||||||||||||||
| stream: ReadableStream, | ||||||||||||||||||||||||||||||
| streamingExec: { execution: any }, | ||||||||||||||||||||||||||||||
| blockId: string, | ||||||||||||||||||||||||||||||
| responseFormat: any | ||||||||||||||||||||||||||||||
| ): Promise<void> { | ||||||||||||||||||||||||||||||
| const reader = stream.getReader() | ||||||||||||||||||||||||||||||
| const decoder = new TextDecoder() | ||||||||||||||||||||||||||||||
| const chunks: string[] = [] | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||
| while (true) { | ||||||||||||||||||||||||||||||
| const { done, value } = await reader.read() | ||||||||||||||||||||||||||||||
| if (done) break | ||||||||||||||||||||||||||||||
| chunks.push(decoder.decode(value, { stream: true })) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| const tail = decoder.decode() | ||||||||||||||||||||||||||||||
| if (tail) chunks.push(tail) | ||||||||||||||||||||||||||||||
| } catch (error) { | ||||||||||||||||||||||||||||||
| this.execLogger.error('Error reading executor stream for block', { blockId, error }) | ||||||||||||||||||||||||||||||
| await processedClientStream.cancel().catch(() => {}) | ||||||||||||||||||||||||||||||
| } finally { | ||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||
| await reader.cancel().catch(() => {}) | ||||||||||||||||||||||||||||||
| sourceReader.releaseLock() | ||||||||||||||||||||||||||||||
| } catch {} | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| const fullContent = chunks.join('') | ||||||||||||||||||||||||||||||
| if (drainError) { | ||||||||||||||||||||||||||||||
| this.execLogger.error('Error reading stream for block', { blockId, error: drainError }) | ||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // If the onStream consumer exited before the source drained (e.g. it caught | ||||||||||||||||||||||||||||||
| // an internal error and returned normally), `accumulated` holds a truncated | ||||||||||||||||||||||||||||||
| // response. Persisting that to memory or setting it as the block output | ||||||||||||||||||||||||||||||
| // would corrupt downstream state — skip and log instead. | ||||||||||||||||||||||||||||||
| if (!sourceFullyDrained) { | ||||||||||||||||||||||||||||||
| this.execLogger.warn( | ||||||||||||||||||||||||||||||
| 'Stream consumer exited before source drained; skipping content persistence', | ||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||
| blockId, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| const fullContent = accumulated.join('') | ||||||||||||||||||||||||||||||
| if (!fullContent) { | ||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| const executionOutput = streamingExec.execution?.output | ||||||||||||||||||||||||||||||
| if (!executionOutput || typeof executionOutput !== 'object') { | ||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||
| if (executionOutput && typeof executionOutput === 'object') { | ||||||||||||||||||||||||||||||
| let parsedForFormat = false | ||||||||||||||||||||||||||||||
| if (responseFormat) { | ||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||
| const parsed = JSON.parse(fullContent.trim()) | ||||||||||||||||||||||||||||||
| streamingExec.execution.output = { | ||||||||||||||||||||||||||||||
| ...parsed, | ||||||||||||||||||||||||||||||
| tokens: executionOutput.tokens, | ||||||||||||||||||||||||||||||
| toolCalls: executionOutput.toolCalls, | ||||||||||||||||||||||||||||||
| providerTiming: executionOutput.providerTiming, | ||||||||||||||||||||||||||||||
| cost: executionOutput.cost, | ||||||||||||||||||||||||||||||
| model: executionOutput.model, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| parsedForFormat = true | ||||||||||||||||||||||||||||||
| } catch (error) { | ||||||||||||||||||||||||||||||
| this.execLogger.warn('Failed to parse streamed content for response format', { | ||||||||||||||||||||||||||||||
| blockId, | ||||||||||||||||||||||||||||||
| error, | ||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| if (!parsedForFormat) { | ||||||||||||||||||||||||||||||
| executionOutput.content = fullContent | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if (responseFormat) { | ||||||||||||||||||||||||||||||
| if (streamingExec.onFullContent) { | ||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||
| const parsed = JSON.parse(fullContent.trim()) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| streamingExec.execution.output = { | ||||||||||||||||||||||||||||||
| ...parsed, | ||||||||||||||||||||||||||||||
| tokens: executionOutput.tokens, | ||||||||||||||||||||||||||||||
| toolCalls: executionOutput.toolCalls, | ||||||||||||||||||||||||||||||
| providerTiming: executionOutput.providerTiming, | ||||||||||||||||||||||||||||||
| cost: executionOutput.cost, | ||||||||||||||||||||||||||||||
| model: executionOutput.model, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||
| await streamingExec.onFullContent(fullContent) | ||||||||||||||||||||||||||||||
| } catch (error) { | ||||||||||||||||||||||||||||||
| this.execLogger.warn('Failed to parse streamed content for response format', { | ||||||||||||||||||||||||||||||
| blockId, | ||||||||||||||||||||||||||||||
| error, | ||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||
| this.execLogger.error('onFullContent callback failed', { blockId, error }) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
Comment on lines
+720
to
+726
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| executionOutput.content = fullContent | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.