diff --git a/apps/sim/lib/copilot/request/go/stream.test.ts b/apps/sim/lib/copilot/request/go/stream.test.ts index 8234658ddc..8d08f755d8 100644 --- a/apps/sim/lib/copilot/request/go/stream.test.ts +++ b/apps/sim/lib/copilot/request/go/stream.test.ts @@ -10,13 +10,24 @@ import { MothershipStreamV1ToolOutcome, MothershipStreamV1ToolPhase, } from '@/lib/copilot/generated/mothership-stream-v1' + +vi.mock('@/lib/copilot/request/session', async () => { + const actual = await vi.importActual( + '@/lib/copilot/request/session' + ) + return { + ...actual, + hasAbortMarker: vi.fn().mockResolvedValue(false), + } +}) + import { buildPreviewContentUpdate, decodeJsonStringPrefix, extractEditContent, runStreamLoop, } from '@/lib/copilot/request/go/stream' -import { createEvent } from '@/lib/copilot/request/session' +import { AbortReason, createEvent, hasAbortMarker } from '@/lib/copilot/request/session' import { RequestTraceV1Outcome, TraceCollector } from '@/lib/copilot/request/trace' import type { ExecutionContext, StreamingContext } from '@/lib/copilot/request/types' @@ -285,6 +296,137 @@ describe('copilot go stream helpers', () => { ).toBe(true) }) + it('reclassifies as aborted when the body closes without terminal but the abort marker is set', async () => { + const textEvent = createEvent({ + streamId: 'stream-1', + cursor: '1', + seq: 1, + requestId: 'req-1', + type: MothershipStreamV1EventType.text, + payload: { + channel: 'assistant', + text: 'partial response', + }, + }) + + vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent])) + vi.mocked(hasAbortMarker).mockResolvedValueOnce(true) + + const context = createStreamingContext() + const execContext: ExecutionContext = { + userId: 'user-1', + workflowId: 'workflow-1', + } + + await runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, { + timeout: 1000, + }) + + expect(hasAbortMarker).toHaveBeenCalledWith(context.messageId) + expect(context.wasAborted).toBe(true) + expect( + context.errors.some((message) => + message.includes('Copilot backend stream ended before a terminal event') + ) + ).toBe(false) + }) + + it('invokes onAbortObserved with MarkerObservedAtBodyClose when reclassifying via the abort marker', async () => { + const textEvent = createEvent({ + streamId: 'stream-1', + cursor: '1', + seq: 1, + requestId: 'req-1', + type: MothershipStreamV1EventType.text, + payload: { + channel: 'assistant', + text: 'partial response', + }, + }) + + vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent])) + vi.mocked(hasAbortMarker).mockResolvedValueOnce(true) + + const context = createStreamingContext() + const execContext: ExecutionContext = { + userId: 'user-1', + workflowId: 'workflow-1', + } + const onAbortObserved = vi.fn() + + await runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, { + timeout: 1000, + onAbortObserved, + }) + + expect(onAbortObserved).toHaveBeenCalledTimes(1) + expect(onAbortObserved).toHaveBeenCalledWith(AbortReason.MarkerObservedAtBodyClose) + expect(context.wasAborted).toBe(true) + }) + + it('does not invoke onAbortObserved when no abort marker is present at body close', async () => { + const textEvent = createEvent({ + streamId: 'stream-1', + cursor: '1', + seq: 1, + requestId: 'req-1', + type: MothershipStreamV1EventType.text, + payload: { + channel: 'assistant', + text: 'partial response', + }, + }) + + vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent])) + vi.mocked(hasAbortMarker).mockResolvedValueOnce(false) + + const context = createStreamingContext() + const execContext: ExecutionContext = { + userId: 'user-1', + workflowId: 'workflow-1', + } + const onAbortObserved = vi.fn() + + await expect( + runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, { + timeout: 1000, + onAbortObserved, + }) + ).rejects.toThrow('Copilot backend stream ended before a terminal event') + + expect(onAbortObserved).not.toHaveBeenCalled() + }) + + it('still fails closed when the body closes without terminal and the abort marker check throws', async () => { + const textEvent = createEvent({ + streamId: 'stream-1', + cursor: '1', + seq: 1, + requestId: 'req-1', + type: MothershipStreamV1EventType.text, + payload: { + channel: 'assistant', + text: 'partial response', + }, + }) + + vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent])) + vi.mocked(hasAbortMarker).mockRejectedValueOnce(new Error('redis unavailable')) + + const context = createStreamingContext() + const execContext: ExecutionContext = { + userId: 'user-1', + workflowId: 'workflow-1', + } + + await expect( + runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, { + timeout: 1000, + }) + ).rejects.toThrow('Copilot backend stream ended before a terminal event') + expect(context.wasAborted).toBe(false) + }) + it('fails closed when the shared stream receives an invalid event', async () => { vi.mocked(fetch).mockResolvedValueOnce( createSseResponse([ diff --git a/apps/sim/lib/copilot/request/go/stream.ts b/apps/sim/lib/copilot/request/go/stream.ts index c92d135aff..ddbf7678d6 100644 --- a/apps/sim/lib/copilot/request/go/stream.ts +++ b/apps/sim/lib/copilot/request/go/stream.ts @@ -30,7 +30,9 @@ import { } from '@/lib/copilot/request/handlers/types' import { getCopilotTracer } from '@/lib/copilot/request/otel' import { + AbortReason, eventToStreamEvent, + hasAbortMarker, isSubagentSpanStreamEvent, parsePersistedStreamEventEnvelope, } from '@/lib/copilot/request/session' @@ -436,16 +438,32 @@ export async function runStreamLoop( }) if (!context.streamComplete && !abortSignal?.aborted && !context.wasAborted) { - const streamPath = new URL(fetchUrl).pathname - const message = `Copilot backend stream ended before a terminal event on ${streamPath}` - context.errors.push(message) - logger.error('Copilot backend stream ended before a terminal event', { - path: streamPath, - requestId: context.requestId, - messageId: context.messageId, - }) - endedOn = CopilotSseCloseReason.ClosedNoTerminal - throw new CopilotBackendError(message, { status: 503 }) + let abortRequested = false + try { + abortRequested = await hasAbortMarker(context.messageId) + } catch (error) { + logger.warn('Failed to read abort marker at body close', { + streamId: context.messageId, + error: error instanceof Error ? error.message : String(error), + }) + } + + if (abortRequested) { + options.onAbortObserved?.(AbortReason.MarkerObservedAtBodyClose) + context.wasAborted = true + endedOn = CopilotSseCloseReason.Aborted + } else { + const streamPath = new URL(fetchUrl).pathname + const message = `Copilot backend stream ended before a terminal event on ${streamPath}` + context.errors.push(message) + logger.error('Copilot backend stream ended before a terminal event', { + path: streamPath, + requestId: context.requestId, + messageId: context.messageId, + }) + endedOn = CopilotSseCloseReason.ClosedNoTerminal + throw new CopilotBackendError(message, { status: 503 }) + } } } catch (error) { if (error instanceof FatalSseEventError && !context.errors.includes(error.message)) { diff --git a/apps/sim/lib/copilot/request/lifecycle/headless.ts b/apps/sim/lib/copilot/request/lifecycle/headless.ts index 6952782931..381e91a395 100644 --- a/apps/sim/lib/copilot/request/lifecycle/headless.ts +++ b/apps/sim/lib/copilot/request/lifecycle/headless.ts @@ -53,10 +53,10 @@ export async function runHeadlessCopilotLifecycle( simRequestId, otelContext, }) - outcome = options.abortSignal?.aborted - ? RequestTraceV1Outcome.cancelled - : result.success - ? RequestTraceV1Outcome.success + outcome = result.success + ? RequestTraceV1Outcome.success + : options.abortSignal?.aborted || result.cancelled + ? RequestTraceV1Outcome.cancelled : RequestTraceV1Outcome.error return result } catch (error) { diff --git a/apps/sim/lib/copilot/request/lifecycle/start.test.ts b/apps/sim/lib/copilot/request/lifecycle/start.test.ts index 5477fc9994..e27f2a2919 100644 --- a/apps/sim/lib/copilot/request/lifecycle/start.test.ts +++ b/apps/sim/lib/copilot/request/lifecycle/start.test.ts @@ -6,7 +6,10 @@ import { propagation, trace } from '@opentelemetry/api' import { W3CTraceContextPropagator } from '@opentelemetry/core' import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' -import { MothershipStreamV1EventType } from '@/lib/copilot/generated/mothership-stream-v1' +import { + MothershipStreamV1CompletionStatus, + MothershipStreamV1EventType, +} from '@/lib/copilot/generated/mothership-stream-v1' const { runCopilotLifecycle, @@ -60,6 +63,7 @@ vi.mock('@/lib/copilot/request/session', () => ({ registerActiveStream: vi.fn(), unregisterActiveStream: vi.fn(), startAbortPoller: vi.fn().mockReturnValue(setInterval(() => {}, 999999)), + isExplicitStopReason: vi.fn().mockReturnValue(false), SSE_RESPONSE_HEADERS: {}, StreamWriter: vi.fn().mockImplementation(() => ({ attach: vi.fn().mockImplementation((ctrl: ReadableStreamDefaultController) => { @@ -211,6 +215,46 @@ describe('createSSEStream terminal error handling', () => { expect(scheduleBufferCleanup).toHaveBeenCalledWith('stream-1') }) + it('publishes a cancelled completion (not an error) when the orchestrator reports cancelled without abortSignal aborted', async () => { + runCopilotLifecycle.mockResolvedValue({ + success: false, + cancelled: true, + content: '', + contentBlocks: [], + toolCalls: [], + }) + + const stream = createSSEStream({ + requestPayload: { message: 'hello' }, + userId: 'user-1', + streamId: 'stream-1', + executionId: 'exec-1', + runId: 'run-1', + currentChat: null, + isNewChat: false, + message: 'hello', + titleModel: 'gpt-5.4', + requestId: 'req-cancelled', + orchestrateOptions: {}, + }) + + await drainStream(stream) + + expect(appendEvent).not.toHaveBeenCalledWith( + expect.objectContaining({ + type: MothershipStreamV1EventType.error, + }) + ) + expect(appendEvent).toHaveBeenCalledWith( + expect.objectContaining({ + type: MothershipStreamV1EventType.complete, + payload: expect.objectContaining({ + status: MothershipStreamV1CompletionStatus.cancelled, + }), + }) + ) + }) + it('passes an OTel context into the streaming lifecycle', async () => { let lifecycleTraceparent = '' runCopilotLifecycle.mockImplementation(async (_payload, options) => { diff --git a/apps/sim/lib/copilot/request/lifecycle/start.ts b/apps/sim/lib/copilot/request/lifecycle/start.ts index 37d58624c1..d401855b9c 100644 --- a/apps/sim/lib/copilot/request/lifecycle/start.ts +++ b/apps/sim/lib/copilot/request/lifecycle/start.ts @@ -249,6 +249,11 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS onEvent: async (event) => { await publisher.publish(event) }, + onAbortObserved: (reason) => { + if (!abortController.signal.aborted) { + abortController.abort(reason) + } + }, }) lifecycleResult = result @@ -266,7 +271,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS // 3. Otherwise → error. outcome = result.success ? RequestTraceV1Outcome.success - : abortController.signal.aborted || publisher.clientDisconnected + : result.cancelled || abortController.signal.aborted || publisher.clientDisconnected ? RequestTraceV1Outcome.cancelled : RequestTraceV1Outcome.error if (outcome === RequestTraceV1Outcome.cancelled) { diff --git a/apps/sim/lib/copilot/request/session/abort-reason.ts b/apps/sim/lib/copilot/request/session/abort-reason.ts index fa850d63f1..8a6b281e2c 100644 --- a/apps/sim/lib/copilot/request/session/abort-reason.ts +++ b/apps/sim/lib/copilot/request/session/abort-reason.ts @@ -22,6 +22,12 @@ export const AbortReason = { * that the node that DID receive it wrote, and aborts on the poll. */ RedisPoller: 'redis_abort_marker:poller', + /** + * Cross-process stop: same root cause as `RedisPoller`, but observed + * by `runStreamLoop` at body close (the Go body ended before the + * 250ms poller's next tick) rather than by the polling timer. + */ + MarkerObservedAtBodyClose: 'redis_abort_marker:body_close', /** Internal timeout on the outbound explicit-abort fetch to Go. */ ExplicitAbortFetchTimeout: 'timeout:go_explicit_abort_fetch', } as const @@ -38,5 +44,9 @@ export type AbortReasonValue = (typeof AbortReason)[keyof typeof AbortReason] * stops, mirroring `requestctx.IsExplicitUserStop` on the Go side. */ export function isExplicitStopReason(reason: unknown): boolean { - return reason === AbortReason.UserStop || reason === AbortReason.RedisPoller + return ( + reason === AbortReason.UserStop || + reason === AbortReason.RedisPoller || + reason === AbortReason.MarkerObservedAtBodyClose + ) } diff --git a/apps/sim/lib/copilot/request/session/abort.test.ts b/apps/sim/lib/copilot/request/session/abort.test.ts index 9c6ee82aa0..bdfd5d39cb 100644 --- a/apps/sim/lib/copilot/request/session/abort.test.ts +++ b/apps/sim/lib/copilot/request/session/abort.test.ts @@ -98,6 +98,47 @@ describe('startAbortPoller heartbeat', () => { } }) + it('aborts the controller before clearing the marker so the marker is never observable as cleared while the signal is still unaborted', async () => { + const controller = new AbortController() + const streamId = 'stream-order-1' + + let signalAbortedWhenMarkerCleared: boolean | null = null + mockClearAbortMarker.mockImplementationOnce(async () => { + signalAbortedWhenMarkerCleared = controller.signal.aborted + }) + mockHasAbortMarker.mockResolvedValueOnce(true) + + const interval = startAbortPoller(streamId, controller, {}) + + try { + await vi.advanceTimersByTimeAsync(300) + + expect(mockClearAbortMarker).toHaveBeenCalledWith(streamId) + expect(signalAbortedWhenMarkerCleared).toBe(true) + expect(controller.signal.aborted).toBe(true) + } finally { + clearInterval(interval) + } + }) + + it('does not clear the marker when the signal is already aborted (no double abort)', async () => { + const controller = new AbortController() + controller.abort('preexisting') + const streamId = 'stream-order-2' + + mockHasAbortMarker.mockResolvedValueOnce(true) + + const interval = startAbortPoller(streamId, controller, {}) + + try { + await vi.advanceTimersByTimeAsync(300) + + expect(mockClearAbortMarker).not.toHaveBeenCalled() + } finally { + clearInterval(interval) + } + }) + it('stops heartbeating after ownership is lost', async () => { const controller = new AbortController() const streamId = 'stream-lost' diff --git a/apps/sim/lib/copilot/request/session/abort.ts b/apps/sim/lib/copilot/request/session/abort.ts index db3beff57e..ce50869036 100644 --- a/apps/sim/lib/copilot/request/session/abort.ts +++ b/apps/sim/lib/copilot/request/session/abort.ts @@ -17,7 +17,7 @@ const pendingChatStreams = new Map< { promise: Promise; resolve: () => void; streamId: string } >() -const DEFAULT_ABORT_POLL_MS = 1000 +const DEFAULT_ABORT_POLL_MS = 250 /** * TTL for the per-chat stream lock. Kept short so that if the Sim pod diff --git a/apps/sim/lib/copilot/request/types.ts b/apps/sim/lib/copilot/request/types.ts index bbb6264fd8..d2fff5be35 100644 --- a/apps/sim/lib/copilot/request/types.ts +++ b/apps/sim/lib/copilot/request/types.ts @@ -136,6 +136,14 @@ export interface OrchestratorOptions { onComplete?: (result: OrchestratorResult) => void | Promise onError?: (error: Error) => void | Promise abortSignal?: AbortSignal + /** + * Invoked when the orchestrator infers that the run was aborted via + * an out-of-band signal (currently: a Redis abort marker observed + * at SSE body close). Callers wire this to fire their local + * `AbortController` so `signal.reason` is set and `recordCancelled` + * classifies as `explicit_stop` rather than `unknown`. + */ + onAbortObserved?: (reason: string) => void interactive?: boolean }