Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/fix-realtime-fetch-signal-leak.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Fix RSS memory leak in the realtime proxy routes. `/realtime/v1/runs`, `/realtime/v1/runs/:id`, and `/realtime/v1/batches/:id` called `fetch()` into Electric with no abort signal, so when a client disconnected mid long-poll, undici kept the upstream socket open and buffered response chunks that would never be consumed — retained only in RSS, invisible to V8 heap tooling. Thread `getRequestAbortSignal()` through `RealtimeClient.streamRun/streamRuns/streamBatch` to `longPollingFetch` and cancel the upstream body in the error path. Isolated reproducer showed ~44 KB retained per leaked request; signal propagation releases it cleanly.
Comment thread
ericallam marked this conversation as resolved.
4 changes: 3 additions & 1 deletion apps/webapp/app/routes/realtime.v1.batches.$batchId.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { z } from "zod";
import { $replica } from "~/db.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

Expand Down Expand Up @@ -33,7 +34,8 @@ export const loader = createLoaderApiRoute(
batchRun.id,
apiVersion,
authentication.realtime,
request.headers.get("x-trigger-electric-version") ?? undefined
request.headers.get("x-trigger-electric-version") ?? undefined,
getRequestAbortSignal()
);
}
);
8 changes: 7 additions & 1 deletion apps/webapp/app/routes/realtime.v1.runs.$runId.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

Expand Down Expand Up @@ -46,7 +47,12 @@ export const loader = createLoaderApiRoute(
run.id,
apiVersion,
authentication.realtime,
request.headers.get("x-trigger-electric-version") ?? undefined
request.headers.get("x-trigger-electric-version") ?? undefined,
// Propagate abort on client disconnect so the upstream Electric long-poll
// fetch is cancelled too. Without this, undici buffers from the unconsumed
// upstream response body accumulate until Electric's poll timeout, causing
// steady RSS growth on api (see docs/runbooks for the H1 isolation test).
getRequestAbortSignal()
);
}
);
4 changes: 3 additions & 1 deletion apps/webapp/app/routes/realtime.v1.runs.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { z } from "zod";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

Expand Down Expand Up @@ -31,7 +32,8 @@ export const loader = createLoaderApiRoute(
searchParams,
apiVersion,
authentication.realtime,
request.headers.get("x-trigger-electric-version") ?? undefined
request.headers.get("x-trigger-electric-version") ?? undefined,
getRequestAbortSignal()
);
}
);
23 changes: 15 additions & 8 deletions apps/webapp/app/services/realtimeClient.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,17 @@ export class RealtimeClient {
runId: string,
apiVersion: API_VERSIONS,
requestOptions?: RealtimeRequestOptions,
clientVersion?: string
clientVersion?: string,
signal?: AbortSignal
) {
return this.#streamRunsWhere(
url,
environment,
`id='${runId}'`,
apiVersion,
requestOptions,
clientVersion
clientVersion,
signal
);
}

Expand All @@ -133,7 +135,8 @@ export class RealtimeClient {
batchId: string,
apiVersion: API_VERSIONS,
requestOptions?: RealtimeRequestOptions,
clientVersion?: string
clientVersion?: string,
signal?: AbortSignal
) {
const whereClauses: string[] = [
`"runtimeEnvironmentId"='${environment.id}'`,
Expand All @@ -148,7 +151,8 @@ export class RealtimeClient {
whereClause,
apiVersion,
requestOptions,
clientVersion
clientVersion,
signal
);
}

Expand All @@ -158,7 +162,8 @@ export class RealtimeClient {
params: RealtimeRunsParams,
apiVersion: API_VERSIONS,
requestOptions?: RealtimeRequestOptions,
clientVersion?: string
clientVersion?: string,
signal?: AbortSignal
) {
const whereClauses: string[] = [`"runtimeEnvironmentId"='${environment.id}'`];

Expand All @@ -180,7 +185,8 @@ export class RealtimeClient {
whereClause,
apiVersion,
requestOptions,
clientVersion
clientVersion,
signal
);

if (createdAtFilter) {
Expand Down Expand Up @@ -274,7 +280,8 @@ export class RealtimeClient {
whereClause: string,
apiVersion: API_VERSIONS,
requestOptions?: RealtimeRequestOptions,
clientVersion?: string
clientVersion?: string,
signal?: AbortSignal
) {
const electricUrl = this.#constructRunsElectricUrl(
url,
Expand All @@ -288,7 +295,7 @@ export class RealtimeClient {
electricUrl,
environment,
apiVersion,
undefined,
signal,
clientVersion
);
}
Expand Down
18 changes: 14 additions & 4 deletions apps/webapp/app/utils/longPollingFetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ export async function longPollingFetch(
options?: RequestInit,
rewriteResponseHeaders?: Record<string, string>
) {
let upstream: Response | undefined;
try {
let response = await fetch(url, options);
upstream = await fetch(url, options);
let response = upstream;

if (response.headers.get("content-encoding")) {
const headers = new Headers(response.headers);
Expand Down Expand Up @@ -46,16 +48,24 @@ export async function longPollingFetch(

return response;
} catch (error) {
// Release upstream undici socket + buffers explicitly. Without this the
// ReadableStream stays open and undici keeps buffering chunks into memory
// until the upstream times out (see H1 isolation test — ~44 KB retained
// per unconsumed-body fetch in RSS).
try { await upstream?.body?.cancel(); } catch {}

// AbortError is the expected path when downstream disconnects with a
// propagated signal — treat as a clean client-close, not a server error.
if (error instanceof Error && error.name === "AbortError") {
throw new Response(null, { status: 499 });
}
if (error instanceof TypeError) {
// Network error or other fetch-related errors
logger.error("Network error:", { error: error.message });
throw new Response("Network error occurred", { status: 503 });
} else if (error instanceof Error) {
// HTTP errors or other known errors
logger.error("Fetch error:", { error: error.message });
throw new Response(error.message, { status: 500 });
} else {
// Unknown errors
logger.error("Unknown error occurred during fetch");
throw new Response("An unknown error occurred", { status: 500 });
}
Expand Down
Loading