Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
79dd2da
feat(testcontainers): schemaOnlyPrisma fixture - empty template clone…
d-cs Jun 10, 2026
d44f4f3
test(run-engine): cover getSnapshotsSince replica-miss fallback (red)
d-cs Jun 10, 2026
516533c
test(run-engine): tie-proof replica fallback window assertion, drop r…
d-cs Jun 10, 2026
ade8c55
feat(run-engine): typed ExecutionSnapshotNotFoundError from getExecut…
d-cs Jun 10, 2026
73d5c55
feat(run-engine): retry getSnapshotsSince on the primary when the rep…
d-cs Jun 10, 2026
d680da8
fix(run-engine): only count replica misses the primary can serve; ski…
d-cs Jun 10, 2026
6651568
chore: server-changes note for snapshots-since replica fallback
d-cs Jun 10, 2026
a868b3a
test(run-engine): cover replica-read, stale-tail, flag-off, tx bypass…
d-cs Jun 10, 2026
9737060
fix(run-engine): mark replica-retry failures in logs, narrow release …
d-cs Jun 10, 2026
a5d59a1
test(run-engine): getSnapshotsSince must reject a since snapshot from…
d-cs Jun 10, 2026
1c128bd
fix(run-engine): scope getSnapshotsSince anchor lookup to the polled run
d-cs Jun 10, 2026
4f98828
chore(run-engine): trim assertion-echo comments, tighten fallback com…
d-cs Jun 10, 2026
01675a8
test(run-engine): replica catches up during jittered retry window (red)
d-cs Jun 10, 2026
b4625be
feat(run-engine): jittered replica retry before primary fallback in g…
d-cs Jun 10, 2026
6673a94
chore(run-engine): harden retry-window test against cold connects, di…
d-cs Jun 10, 2026
1157227
chore: unwrap server-changes notes to one line per paragraph
d-cs Jun 10, 2026
f4dac20
fix(run-engine,testcontainers): address review - normalize retry boun…
d-cs Jun 10, 2026
f26b646
docs(run-engine): document maxMs <= 0 (not just 0) as disabling the r…
d-cs Jun 10, 2026
51924bb
chore: combine the two snapshots-since server-changes notes into one …
d-cs Jun 10, 2026
3a5e88b
docs(run-engine): clarify that only not-found replica errors trigger …
d-cs Jun 10, 2026
8418c7b
chore: retrigger CI
d-cs Jun 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/snapshots-since-replica-primary-fallback.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Run snapshot polling no longer errors or pays extra latency when the database read replica hasn't yet replicated the snapshot the runner is polling from (`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read is briefly retried on the replica and served from the primary if it still hasn't caught up. Polling also now rejects a since-snapshot id that doesn't belong to the run being polled.
2 changes: 2 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,8 @@ const EnvironmentSchema = z
.default("info"),
RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM: z.string().default("0"),
RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED: z.string().default("0"),
RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MIN_MS: z.coerce.number().int().default(50),
RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MAX_MS: z.coerce.number().int().default(200),
RUN_ENGINE_DEBOUNCE_USE_REPLICA_FOR_FAST_PATH_READ: z.string().default("0"),

/** How long should the presence ttl last */
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ function createRunEngine() {
env.RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM === "1",
readReplicaSnapshotsSinceEnabled:
env.RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED === "1",
readReplicaSnapshotsSinceRetryDelay: {
minMs: env.RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MIN_MS,
maxMs: env.RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MAX_MS,
},
worker: {
disabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
workers: env.RUN_ENGINE_WORKER_COUNT,
Expand Down
1 change: 1 addition & 0 deletions internal-packages/run-engine/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
},
"devDependencies": {
"@internal/testcontainers": "workspace:*",
"@opentelemetry/sdk-metrics": "2.7.1",
"@types/seedrandom": "^3.0.8",
"rimraf": "6.0.1"
},
Expand Down
7 changes: 7 additions & 0 deletions internal-packages/run-engine/src/engine/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,10 @@ export class RunOneTimeUseTokenError extends Error {
this.name = "RunOneTimeUseTokenError";
}
}

export class ExecutionSnapshotNotFoundError extends Error {
constructor(public readonly snapshotId: string) {
super(`No execution snapshot found for id ${snapshotId}`);
this.name = "ExecutionSnapshotNotFoundError";
}
}
92 changes: 86 additions & 6 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createRedisClient, Redis } from "@internal/redis";
import { getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing";
import { type Counter, getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing";
import { Logger } from "@trigger.dev/core/logger";
import {
CheckpointInput,
Expand Down Expand Up @@ -33,6 +33,7 @@ import {
import { Worker } from "@trigger.dev/redis-worker";
import { assertNever } from "assert-never";
import { EventEmitter } from "node:events";
import { setTimeout } from "node:timers/promises";
import { BatchQueue } from "../batch-queue/index.js";
import type {
BatchItem,
Expand All @@ -46,7 +47,12 @@ import { RunQueue } from "../run-queue/index.js";
import { RunQueueFullKeyProducer } from "../run-queue/keyProducer.js";
import { AuthenticatedEnvironment, MinimalAuthenticatedEnvironment } from "../shared/index.js";
import { BillingCache } from "./billingCache.js";
import { NotImplementedError, RunDuplicateIdempotencyKeyError, RunOneTimeUseTokenError } from "./errors.js";
import {
ExecutionSnapshotNotFoundError,
NotImplementedError,
RunDuplicateIdempotencyKeyError,
RunOneTimeUseTokenError,
} from "./errors.js";
import { EventBus, EventBusEvents } from "./eventBus.js";
import { RunLocker } from "./locking.js";
import { getFinalRunStatuses } from "./statuses.js";
Expand Down Expand Up @@ -88,6 +94,8 @@ export class RunEngine {
private logger: Logger;
private tracer: Tracer;
private meter: Meter;
private snapshotsSinceReplicaMissCounter: Counter;
private snapshotsSinceReplicaRetryDelay: { minMs: number; maxMs: number };
private heartbeatTimeouts: HeartbeatTimeouts;
private repairSnapshotTimeoutMs: number;
private batchQueue: BatchQueue;
Expand Down Expand Up @@ -272,6 +280,22 @@ export class RunEngine {
this.tracer = options.tracer;
this.meter = options.meter ?? getMeter("run-engine");

this.snapshotsSinceReplicaMissCounter = this.meter.createCounter(
"run_engine.snapshots_since.replica_miss",
{
description:
"getSnapshotsSince reads where the since snapshot was not yet on the read replica, recovered via a replica retry or served from the primary",
}
);

// Normalize the bounds, but keep maxMs <= 0 meaning "skip the replica retry".
const retryDelay = options.readReplicaSnapshotsSinceRetryDelay ?? { minMs: 50, maxMs: 200 };
const retryMinMs = Math.max(0, retryDelay.minMs);
this.snapshotsSinceReplicaRetryDelay = {
minMs: retryDelay.maxMs > 0 ? Math.min(retryMinMs, retryDelay.maxMs) : retryMinMs,
maxMs: retryDelay.maxMs,
};

const defaultHeartbeatTimeouts: HeartbeatTimeouts = {
PENDING_EXECUTING: 60_000,
PENDING_CANCEL: 60_000,
Expand Down Expand Up @@ -1918,13 +1942,69 @@ export class RunEngine {
snapshotId: string;
tx?: PrismaClientOrTransaction;
}): Promise<RunExecutionData[] | null> {
const prisma =
tx ?? (this.options.readReplicaSnapshotsSinceEnabled ? this.readOnlyPrisma : this.prisma);
const useReplica =
!tx &&
this.options.readReplicaSnapshotsSinceEnabled === true &&
this.readOnlyPrisma !== this.prisma;
const prisma = tx ?? (useReplica ? this.readOnlyPrisma : this.prisma);

const query = async (client: PrismaClientOrTransaction) => {
const snapshots = await getExecutionSnapshotsSince(client, runId, snapshotId);
return snapshots.map(executionDataFromSnapshot);
};

try {
const snapshots = await getExecutionSnapshotsSince(prisma, runId, snapshotId);
return snapshots.map(executionDataFromSnapshot);
return await query(prisma);
} catch (e) {
if (useReplica && e instanceof ExecutionSnapshotNotFoundError) {
// Replica lag: the runner learned this snapshot id from the writer before the
// replica caught up. Give the replica one jittered retry; if it's still missing,
// serve from the writer. Only not-found errors get this treatment - any other
// replica failure stays an error rather than shifting read load to the writer.
// A miss on the writer too is a real error, not lag.
const { minMs, maxMs } = this.snapshotsSinceReplicaRetryDelay;
if (maxMs > 0) {
await setTimeout(minMs + Math.random() * Math.max(0, maxMs - minMs));
try {
const result = await query(this.readOnlyPrisma);
this.snapshotsSinceReplicaMissCounter.add(1, { outcome: "replica_retry" });
return result;
} catch (replicaRetryError) {
if (!(replicaRetryError instanceof ExecutionSnapshotNotFoundError)) {
this.logger.error("Failed to getSnapshotsSince", {
message:
replicaRetryError instanceof Error
? replicaRetryError.message
: replicaRetryError,
runId,
snapshotId,
failedDuring: "replica_retry",
});
return null;
}
// still not on the replica - fall through to the primary
}
Comment thread
d-cs marked this conversation as resolved.
}

try {
const result = await query(this.prisma);
this.snapshotsSinceReplicaMissCounter.add(1, { outcome: "primary" });
this.logger.warn("getSnapshotsSince: snapshot not yet on replica, served from primary", {
runId,
snapshotId,
});
return result;
} catch (retryError) {
this.logger.error("Failed to getSnapshotsSince", {
message: retryError instanceof Error ? retryError.message : retryError,
runId,
snapshotId,
failedDuring: "primary_fallback",
});
return null;
}
}

this.logger.error("Failed to getSnapshotsSince", {
message: e instanceof Error ? e.message : e,
runId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
TaskRunStatus,
Waitpoint,
} from "@trigger.dev/database";
import { ExecutionSnapshotNotFoundError } from "../errors.js";
import { HeartbeatTimeouts } from "../types.js";
import { SystemResources } from "./systems.js";

Expand Down Expand Up @@ -273,12 +274,12 @@ export async function getExecutionSnapshotsSince(
): Promise<EnhancedExecutionSnapshot[]> {
// Step 1: Find the createdAt of the sinceSnapshotId
const sinceSnapshot = await prisma.taskRunExecutionSnapshot.findFirst({
where: { id: sinceSnapshotId },
where: { id: sinceSnapshotId, runId },
select: { createdAt: true },
});

if (!sinceSnapshot) {
throw new Error(`No execution snapshot found for id ${sinceSnapshotId}`);
throw new ExecutionSnapshotNotFoundError(sinceSnapshotId);
}

// Step 2: Fetch snapshots WITHOUT waitpoints to avoid N×M data explosion
Expand Down
Loading