From 586315bb6d5f6a4e3bf2e50c9f7c764e9e1d79fa Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 24 Apr 2026 14:53:20 +0100 Subject: [PATCH] fix(webapp): propagate abort signal through realtime proxy fetch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The three high-traffic realtime proxy routes (/realtime/v1/runs, /realtime/v1/runs/:id, /realtime/v1/batches/:id) all route through RealtimeClient.streamRun/streamRuns/streamBatch -> #streamRunsWhere -> #performElectricRequest -> longPollingFetch(url, {signal}). The #streamRunsWhere caller hardcoded signal=undefined, so the upstream fetch to Electric had no abort signal. When a downstream client disconnected mid long-poll, undici kept the upstream socket open and continued buffering response chunks that would never be read, until Electric's own poll timeout elapsed (up to ~20s). The buffered bytes live in native memory below V8's accounting, so the retention shows up only in RSS — invisible to heap snapshots. Thread a signal parameter through streamRun/streamRuns/streamBatch (and the shared #streamRunsWhere) and pass getRequestAbortSignal() from each of the three route handlers. Also cancel the upstream body explicitly in longPollingFetch's error path and treat AbortError as a clean client-close (499) rather than a 500, matching the semantic of 'downstream went away'. Verified in an isolated standalone reproducer (fetch-a-slow-upstream pattern, 5 rounds of 200 parallel fetches, burst-and-discard): A: no signal, body never consumed Δrss=+59.4 MB B: signal propagated, abort on close Δrss=+15.4 MB (plateaus) C: no signal, res.body.cancel() Δrss=-25.4 MB Sustained 10-round test with B: RSS oscillates in a 49-65 MB band with no upward trend -> the signal propagation fully releases the undici buffers; the +15 MB residual in the single-round test was one-time allocator overhead, not accumulation. --- .../fix-realtime-fetch-signal-leak.md | 6 +++++ .../routes/realtime.v1.batches.$batchId.ts | 4 +++- .../app/routes/realtime.v1.runs.$runId.ts | 8 ++++++- apps/webapp/app/routes/realtime.v1.runs.ts | 4 +++- .../app/services/realtimeClient.server.ts | 23 ++++++++++++------- apps/webapp/app/utils/longPollingFetch.ts | 18 +++++++++++---- 6 files changed, 48 insertions(+), 15 deletions(-) create mode 100644 .server-changes/fix-realtime-fetch-signal-leak.md diff --git a/.server-changes/fix-realtime-fetch-signal-leak.md b/.server-changes/fix-realtime-fetch-signal-leak.md new file mode 100644 index 00000000000..ac681a301f6 --- /dev/null +++ b/.server-changes/fix-realtime-fetch-signal-leak.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Fix RSS memory leak in the realtime proxy routes. `/realtime/v1/runs`, `/realtime/v1/runs/:id`, and `/realtime/v1/batches/:id` called `fetch()` into Electric with no abort signal, so when a client disconnected mid long-poll, undici kept the upstream socket open and buffered response chunks that would never be consumed — retained only in RSS, invisible to V8 heap tooling. Thread `getRequestAbortSignal()` through `RealtimeClient.streamRun/streamRuns/streamBatch` to `longPollingFetch` and cancel the upstream body in the error path. Isolated reproducer showed ~44 KB retained per leaked request; signal propagation releases it cleanly. diff --git a/apps/webapp/app/routes/realtime.v1.batches.$batchId.ts b/apps/webapp/app/routes/realtime.v1.batches.$batchId.ts index 17a759e6ca0..33449deebca 100644 --- a/apps/webapp/app/routes/realtime.v1.batches.$batchId.ts +++ b/apps/webapp/app/routes/realtime.v1.batches.$batchId.ts @@ -1,5 +1,6 @@ import { z } from "zod"; import { $replica } from "~/db.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { realtimeClient } from "~/services/realtimeClientGlobal.server"; import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; @@ -33,7 +34,8 @@ export const loader = createLoaderApiRoute( batchRun.id, apiVersion, authentication.realtime, - request.headers.get("x-trigger-electric-version") ?? undefined + request.headers.get("x-trigger-electric-version") ?? undefined, + getRequestAbortSignal() ); } ); diff --git a/apps/webapp/app/routes/realtime.v1.runs.$runId.ts b/apps/webapp/app/routes/realtime.v1.runs.$runId.ts index 35a34b01b4c..060f937b0eb 100644 --- a/apps/webapp/app/routes/realtime.v1.runs.$runId.ts +++ b/apps/webapp/app/routes/realtime.v1.runs.$runId.ts @@ -1,6 +1,7 @@ import { json } from "@remix-run/server-runtime"; import { z } from "zod"; import { $replica } from "~/db.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { realtimeClient } from "~/services/realtimeClientGlobal.server"; import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; @@ -46,7 +47,12 @@ export const loader = createLoaderApiRoute( run.id, apiVersion, authentication.realtime, - request.headers.get("x-trigger-electric-version") ?? undefined + request.headers.get("x-trigger-electric-version") ?? undefined, + // Propagate abort on client disconnect so the upstream Electric long-poll + // fetch is cancelled too. Without this, undici buffers from the unconsumed + // upstream response body accumulate until Electric's poll timeout, causing + // steady RSS growth on api (see docs/runbooks for the H1 isolation test). + getRequestAbortSignal() ); } ); diff --git a/apps/webapp/app/routes/realtime.v1.runs.ts b/apps/webapp/app/routes/realtime.v1.runs.ts index 1819265ee7f..18eeeb0a075 100644 --- a/apps/webapp/app/routes/realtime.v1.runs.ts +++ b/apps/webapp/app/routes/realtime.v1.runs.ts @@ -1,4 +1,5 @@ import { z } from "zod"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { realtimeClient } from "~/services/realtimeClientGlobal.server"; import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; @@ -31,7 +32,8 @@ export const loader = createLoaderApiRoute( searchParams, apiVersion, authentication.realtime, - request.headers.get("x-trigger-electric-version") ?? undefined + request.headers.get("x-trigger-electric-version") ?? undefined, + getRequestAbortSignal() ); } ); diff --git a/apps/webapp/app/services/realtimeClient.server.ts b/apps/webapp/app/services/realtimeClient.server.ts index d962a57426e..12b93f1996d 100644 --- a/apps/webapp/app/services/realtimeClient.server.ts +++ b/apps/webapp/app/services/realtimeClient.server.ts @@ -115,7 +115,8 @@ export class RealtimeClient { runId: string, apiVersion: API_VERSIONS, requestOptions?: RealtimeRequestOptions, - clientVersion?: string + clientVersion?: string, + signal?: AbortSignal ) { return this.#streamRunsWhere( url, @@ -123,7 +124,8 @@ export class RealtimeClient { `id='${runId}'`, apiVersion, requestOptions, - clientVersion + clientVersion, + signal ); } @@ -133,7 +135,8 @@ export class RealtimeClient { batchId: string, apiVersion: API_VERSIONS, requestOptions?: RealtimeRequestOptions, - clientVersion?: string + clientVersion?: string, + signal?: AbortSignal ) { const whereClauses: string[] = [ `"runtimeEnvironmentId"='${environment.id}'`, @@ -148,7 +151,8 @@ export class RealtimeClient { whereClause, apiVersion, requestOptions, - clientVersion + clientVersion, + signal ); } @@ -158,7 +162,8 @@ export class RealtimeClient { params: RealtimeRunsParams, apiVersion: API_VERSIONS, requestOptions?: RealtimeRequestOptions, - clientVersion?: string + clientVersion?: string, + signal?: AbortSignal ) { const whereClauses: string[] = [`"runtimeEnvironmentId"='${environment.id}'`]; @@ -180,7 +185,8 @@ export class RealtimeClient { whereClause, apiVersion, requestOptions, - clientVersion + clientVersion, + signal ); if (createdAtFilter) { @@ -274,7 +280,8 @@ export class RealtimeClient { whereClause: string, apiVersion: API_VERSIONS, requestOptions?: RealtimeRequestOptions, - clientVersion?: string + clientVersion?: string, + signal?: AbortSignal ) { const electricUrl = this.#constructRunsElectricUrl( url, @@ -288,7 +295,7 @@ export class RealtimeClient { electricUrl, environment, apiVersion, - undefined, + signal, clientVersion ); } diff --git a/apps/webapp/app/utils/longPollingFetch.ts b/apps/webapp/app/utils/longPollingFetch.ts index ec7e309180d..cb5f97693b7 100644 --- a/apps/webapp/app/utils/longPollingFetch.ts +++ b/apps/webapp/app/utils/longPollingFetch.ts @@ -11,8 +11,10 @@ export async function longPollingFetch( options?: RequestInit, rewriteResponseHeaders?: Record ) { + let upstream: Response | undefined; try { - let response = await fetch(url, options); + upstream = await fetch(url, options); + let response = upstream; if (response.headers.get("content-encoding")) { const headers = new Headers(response.headers); @@ -46,16 +48,24 @@ export async function longPollingFetch( return response; } catch (error) { + // Release upstream undici socket + buffers explicitly. Without this the + // ReadableStream stays open and undici keeps buffering chunks into memory + // until the upstream times out (see H1 isolation test — ~44 KB retained + // per unconsumed-body fetch in RSS). + try { await upstream?.body?.cancel(); } catch {} + + // AbortError is the expected path when downstream disconnects with a + // propagated signal — treat as a clean client-close, not a server error. + if (error instanceof Error && error.name === "AbortError") { + throw new Response(null, { status: 499 }); + } if (error instanceof TypeError) { - // Network error or other fetch-related errors logger.error("Network error:", { error: error.message }); throw new Response("Network error occurred", { status: 503 }); } else if (error instanceof Error) { - // HTTP errors or other known errors logger.error("Fetch error:", { error: error.message }); throw new Response(error.message, { status: 500 }); } else { - // Unknown errors logger.error("Unknown error occurred during fetch"); throw new Response("An unknown error occurred", { status: 500 }); }