diff --git a/.server-changes/snapshots-since-replica-primary-fallback.md b/.server-changes/snapshots-since-replica-primary-fallback.md new file mode 100644 index 00000000000..9b8257f6410 --- /dev/null +++ b/.server-changes/snapshots-since-replica-primary-fallback.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Run snapshot polling no longer errors or pays extra latency when the database read replica hasn't yet replicated the snapshot the runner is polling from (`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read is briefly retried on the replica and served from the primary if it still hasn't caught up. Polling also now rejects a since-snapshot id that doesn't belong to the run being polled. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index c55bb424001..d0e7e39471b 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -950,6 +950,8 @@ const EnvironmentSchema = z .default("info"), RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM: z.string().default("0"), RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED: z.string().default("0"), + RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MIN_MS: z.coerce.number().int().default(50), + RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MAX_MS: z.coerce.number().int().default(200), RUN_ENGINE_DEBOUNCE_USE_REPLICA_FOR_FAST_PATH_READ: z.string().default("0"), /** How long should the presence ttl last */ diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index 7e96c5184b2..e2c8ad85e94 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -22,6 +22,10 @@ function createRunEngine() { env.RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM === "1", readReplicaSnapshotsSinceEnabled: env.RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED === "1", + readReplicaSnapshotsSinceRetryDelay: { + minMs: env.RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MIN_MS, + maxMs: env.RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MAX_MS, + }, worker: { disabled: env.RUN_ENGINE_WORKER_ENABLED === "0", workers: env.RUN_ENGINE_WORKER_COUNT, diff --git a/internal-packages/run-engine/package.json b/internal-packages/run-engine/package.json index 680d385ca4e..8b876a1aab6 100644 --- a/internal-packages/run-engine/package.json +++ b/internal-packages/run-engine/package.json @@ -35,6 +35,7 @@ }, "devDependencies": { "@internal/testcontainers": "workspace:*", + "@opentelemetry/sdk-metrics": "2.7.1", "@types/seedrandom": "^3.0.8", "rimraf": "6.0.1" }, diff --git a/internal-packages/run-engine/src/engine/errors.ts b/internal-packages/run-engine/src/engine/errors.ts index 9a41cba11ee..2bb05a304c9 100644 --- a/internal-packages/run-engine/src/engine/errors.ts +++ b/internal-packages/run-engine/src/engine/errors.ts @@ -104,3 +104,10 @@ export class RunOneTimeUseTokenError extends Error { this.name = "RunOneTimeUseTokenError"; } } + +export class ExecutionSnapshotNotFoundError extends Error { + constructor(public readonly snapshotId: string) { + super(`No execution snapshot found for id ${snapshotId}`); + this.name = "ExecutionSnapshotNotFoundError"; + } +} diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 835ff90cc48..22c21cb2881 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1,5 +1,5 @@ import { createRedisClient, Redis } from "@internal/redis"; -import { getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing"; +import { type Counter, getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing"; import { Logger } from "@trigger.dev/core/logger"; import { CheckpointInput, @@ -33,6 +33,7 @@ import { import { Worker } from "@trigger.dev/redis-worker"; import { assertNever } from "assert-never"; import { EventEmitter } from "node:events"; +import { setTimeout } from "node:timers/promises"; import { BatchQueue } from "../batch-queue/index.js"; import type { BatchItem, @@ -46,7 +47,12 @@ import { RunQueue } from "../run-queue/index.js"; import { RunQueueFullKeyProducer } from "../run-queue/keyProducer.js"; import { AuthenticatedEnvironment, MinimalAuthenticatedEnvironment } from "../shared/index.js"; import { BillingCache } from "./billingCache.js"; -import { NotImplementedError, RunDuplicateIdempotencyKeyError, RunOneTimeUseTokenError } from "./errors.js"; +import { + ExecutionSnapshotNotFoundError, + NotImplementedError, + RunDuplicateIdempotencyKeyError, + RunOneTimeUseTokenError, +} from "./errors.js"; import { EventBus, EventBusEvents } from "./eventBus.js"; import { RunLocker } from "./locking.js"; import { getFinalRunStatuses } from "./statuses.js"; @@ -88,6 +94,8 @@ export class RunEngine { private logger: Logger; private tracer: Tracer; private meter: Meter; + private snapshotsSinceReplicaMissCounter: Counter; + private snapshotsSinceReplicaRetryDelay: { minMs: number; maxMs: number }; private heartbeatTimeouts: HeartbeatTimeouts; private repairSnapshotTimeoutMs: number; private batchQueue: BatchQueue; @@ -272,6 +280,22 @@ export class RunEngine { this.tracer = options.tracer; this.meter = options.meter ?? getMeter("run-engine"); + this.snapshotsSinceReplicaMissCounter = this.meter.createCounter( + "run_engine.snapshots_since.replica_miss", + { + description: + "getSnapshotsSince reads where the since snapshot was not yet on the read replica, recovered via a replica retry or served from the primary", + } + ); + + // Normalize the bounds, but keep maxMs <= 0 meaning "skip the replica retry". + const retryDelay = options.readReplicaSnapshotsSinceRetryDelay ?? { minMs: 50, maxMs: 200 }; + const retryMinMs = Math.max(0, retryDelay.minMs); + this.snapshotsSinceReplicaRetryDelay = { + minMs: retryDelay.maxMs > 0 ? Math.min(retryMinMs, retryDelay.maxMs) : retryMinMs, + maxMs: retryDelay.maxMs, + }; + const defaultHeartbeatTimeouts: HeartbeatTimeouts = { PENDING_EXECUTING: 60_000, PENDING_CANCEL: 60_000, @@ -1918,13 +1942,69 @@ export class RunEngine { snapshotId: string; tx?: PrismaClientOrTransaction; }): Promise { - const prisma = - tx ?? (this.options.readReplicaSnapshotsSinceEnabled ? this.readOnlyPrisma : this.prisma); + const useReplica = + !tx && + this.options.readReplicaSnapshotsSinceEnabled === true && + this.readOnlyPrisma !== this.prisma; + const prisma = tx ?? (useReplica ? this.readOnlyPrisma : this.prisma); + + const query = async (client: PrismaClientOrTransaction) => { + const snapshots = await getExecutionSnapshotsSince(client, runId, snapshotId); + return snapshots.map(executionDataFromSnapshot); + }; try { - const snapshots = await getExecutionSnapshotsSince(prisma, runId, snapshotId); - return snapshots.map(executionDataFromSnapshot); + return await query(prisma); } catch (e) { + if (useReplica && e instanceof ExecutionSnapshotNotFoundError) { + // Replica lag: the runner learned this snapshot id from the writer before the + // replica caught up. Give the replica one jittered retry; if it's still missing, + // serve from the writer. Only not-found errors get this treatment - any other + // replica failure stays an error rather than shifting read load to the writer. + // A miss on the writer too is a real error, not lag. + const { minMs, maxMs } = this.snapshotsSinceReplicaRetryDelay; + if (maxMs > 0) { + await setTimeout(minMs + Math.random() * Math.max(0, maxMs - minMs)); + try { + const result = await query(this.readOnlyPrisma); + this.snapshotsSinceReplicaMissCounter.add(1, { outcome: "replica_retry" }); + return result; + } catch (replicaRetryError) { + if (!(replicaRetryError instanceof ExecutionSnapshotNotFoundError)) { + this.logger.error("Failed to getSnapshotsSince", { + message: + replicaRetryError instanceof Error + ? replicaRetryError.message + : replicaRetryError, + runId, + snapshotId, + failedDuring: "replica_retry", + }); + return null; + } + // still not on the replica - fall through to the primary + } + } + + try { + const result = await query(this.prisma); + this.snapshotsSinceReplicaMissCounter.add(1, { outcome: "primary" }); + this.logger.warn("getSnapshotsSince: snapshot not yet on replica, served from primary", { + runId, + snapshotId, + }); + return result; + } catch (retryError) { + this.logger.error("Failed to getSnapshotsSince", { + message: retryError instanceof Error ? retryError.message : retryError, + runId, + snapshotId, + failedDuring: "primary_fallback", + }); + return null; + } + } + this.logger.error("Failed to getSnapshotsSince", { message: e instanceof Error ? e.message : e, runId, diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index d615c066b85..d8e9656f395 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -10,6 +10,7 @@ import { TaskRunStatus, Waitpoint, } from "@trigger.dev/database"; +import { ExecutionSnapshotNotFoundError } from "../errors.js"; import { HeartbeatTimeouts } from "../types.js"; import { SystemResources } from "./systems.js"; @@ -273,12 +274,12 @@ export async function getExecutionSnapshotsSince( ): Promise { // Step 1: Find the createdAt of the sinceSnapshotId const sinceSnapshot = await prisma.taskRunExecutionSnapshot.findFirst({ - where: { id: sinceSnapshotId }, + where: { id: sinceSnapshotId, runId }, select: { createdAt: true }, }); if (!sinceSnapshot) { - throw new Error(`No execution snapshot found for id ${sinceSnapshotId}`); + throw new ExecutionSnapshotNotFoundError(sinceSnapshotId); } // Step 2: Fetch snapshots WITHOUT waitpoints to avoid N×M data explosion diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts index 77867b1b1b1..15831d10839 100644 --- a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -13,6 +13,10 @@ import { setupTestScenario, generateLargeOutput, } from "./helpers/snapshotTestHelpers.js"; +import { + copySnapshotsToReplica, + createTestMetricsMeter, +} from "./helpers/replicaTestHelpers.js"; import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic"; vi.setConfig({ testTimeout: 120_000 }); @@ -432,6 +436,117 @@ describe("RunEngine getSnapshotsSince", () => { } }); + containerTest( + "returns null when the since snapshot belongs to a different run", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runA = await engine.trigger( + { + number: 1, + friendlyId: generateFriendlyId("run"), + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_foreign_snapshot", + spanId: "s_foreign_snapshot_a", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + const runB = await engine.trigger( + { + number: 2, + friendlyId: generateFriendlyId("run"), + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_foreign_snapshot", + spanId: "s_foreign_snapshot_b", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + await engine.dequeueFromWorkerQueue({ + consumerId: "test_foreign_snapshot", + workerQueue: "main", + }); + + const runASnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: runA.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + const runBSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: runB.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + + expect(runASnapshots.length).toBeGreaterThanOrEqual(1); + expect(runBSnapshots.length).toBeGreaterThanOrEqual(1); + + const runASnapshot = runASnapshots[0]; + + // Poll run B using a snapshot id that belongs to run A. + const result = await engine.getSnapshotsSince({ + runId: runB.id, + snapshotId: runASnapshot.id, + }); + + expect(result).toBeNull(); + } finally { + await engine.quit(); + } + } + ); + // Direct database tests for the core function containerTest( "direct test: large waitpoint scenario - 100 waitpoints with 10KB outputs", @@ -679,4 +794,619 @@ describe("RunEngine getSnapshotsSince", () => { } } ); + + containerTest( + "falls back to the primary when the replica is missing the since snapshot", + async ({ prisma, schemaOnlyPrisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const { meter, getCounterValue } = createTestMetricsMeter(); + + const engine = new RunEngine({ + prisma, + // An empty (schema-only) database stands in for a read replica that has not + // caught up: every lookup on it misses, so the engine must fall back to the + // primary instead of failing the poll. + readOnlyPrisma: schemaOnlyPrisma, + readReplicaSnapshotsSinceEnabled: true, + // Tiny jitter window: the replica is permanently empty here, so the retry + // always misses - no need to pay a realistic delay. + readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 }, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + meter, + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_replica_fallback", + spanId: "s_replica_fallback", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + await engine.dequeueFromWorkerQueue({ + consumerId: "test_replica_fallback", + workerQueue: "main", + }); + + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + expect(allSnapshots.length).toBeGreaterThan(1); + + // The since-snapshot exists only on the primary - the replica misses it. + const firstSnapshot = allSnapshots[0]; + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: firstSnapshot.id, + }); + + // Served by the primary fallback, not a failed poll. + expect(result).not.toBeNull(); + const expectedSnapshots = allSnapshots.filter( + (s) => s.createdAt.getTime() > firstSnapshot.createdAt.getTime() + ); + expect(expectedSnapshots.length).toBeGreaterThan(0); + expect(result!.length).toBe(expectedSnapshots.length); + // Compare as sorted lists: same-millisecond snapshots have unspecified relative + // order in both the engine query and this test's query. + expect(result!.map((s) => s.snapshot.id).sort()).toEqual( + expectedSnapshots.map((s) => s.id).sort() + ); + + expect( + await getCounterValue("run_engine.snapshots_since.replica_miss", { outcome: "primary" }) + ).toBe(1); + // The replica retry never succeeds against a permanently empty replica. + expect( + await getCounterValue("run_engine.snapshots_since.replica_miss", { + outcome: "replica_retry", + }) + ).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "serves the read from the replica after a jittered retry when it catches up", + async ({ prisma, schemaOnlyPrisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const { meter, getCounterValue } = createTestMetricsMeter(); + + const engine = new RunEngine({ + prisma, + // The schema-only database stands in for a lagging replica: empty when the + // poll first arrives, caught up by the time the jittered retry fires. + readOnlyPrisma: schemaOnlyPrisma, + readReplicaSnapshotsSinceEnabled: true, + // A near-deterministic ~400ms window: long enough to seed the replica + // mid-flight (below), short enough to keep the test fast. + readReplicaSnapshotsSinceRetryDelay: { minMs: 400, maxMs: 401 }, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + meter, + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_replica_retry", + spanId: "s_replica_retry", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + await engine.dequeueFromWorkerQueue({ + consumerId: "test_replica_retry", + workerQueue: "main", + }); + + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + expect(allSnapshots.length).toBeGreaterThan(1); + + const firstSnapshot = allSnapshots[0]; + + // Warm the replica connection so the engine's first attempt is a fast point + // read - a cold Prisma connect could outlast the 100ms seeding window below. + await schemaOnlyPrisma.$queryRaw`SELECT 1`; + + // Kick off the poll against the still-empty replica, then seed the replica + // well before the ~400ms jittered retry fires - simulating the replica + // catching up while the engine waits. + const resultPromise = engine.getSnapshotsSince({ + runId: run.id, + snapshotId: firstSnapshot.id, + }); + await setTimeout(100); + await copySnapshotsToReplica(prisma, schemaOnlyPrisma, run.id); + const result = await resultPromise; + + expect(result).not.toBeNull(); + const expectedSnapshots = allSnapshots.filter( + (s) => s.createdAt.getTime() > firstSnapshot.createdAt.getTime() + ); + expect(expectedSnapshots.length).toBeGreaterThan(0); + expect(result!.length).toBe(expectedSnapshots.length); + // Compare as sorted lists: same-millisecond snapshots have unspecified relative + // order in both the engine query and this test's query. + expect(result!.map((s) => s.snapshot.id).sort()).toEqual( + expectedSnapshots.map((s) => s.id).sort() + ); + + // Recovered on the replica retry - the writer was never consulted. + expect( + await getCounterValue("run_engine.snapshots_since.replica_miss", { + outcome: "replica_retry", + }) + ).toBe(1); + expect( + await getCounterValue("run_engine.snapshots_since.replica_miss", { outcome: "primary" }) + ).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "returns null when the snapshot is missing on both replica and primary", + async ({ prisma, schemaOnlyPrisma, redisOptions }) => { + const { meter, getCounterValue } = createTestMetricsMeter(); + + const engine = new RunEngine({ + prisma, + readOnlyPrisma: schemaOnlyPrisma, + readReplicaSnapshotsSinceEnabled: true, + readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 }, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + meter, + }); + + try { + // runId is never consulted - the since-snapshot lookup throws on the bogus id first. + const result = await engine.getSnapshotsSince({ + runId: "run_does_not_exist", + snapshotId: "snapshot_does_not_exist", + }); + + expect(result).toBeNull(); + + // Permanent misses are deliberately NOT counted - the counter only tracks + // reads actually served by the primary fallback. + expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "serves the replica's view when the replica has the since snapshot but lags behind the primary", + async ({ prisma, schemaOnlyPrisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const { meter, getCounterValue } = createTestMetricsMeter(); + + const engine = new RunEngine({ + prisma, + // The schema-only database stands in for a replica that has the since-snapshot + // but lags behind the primary by one snapshot (the newest one is excluded below). + readOnlyPrisma: schemaOnlyPrisma, + readReplicaSnapshotsSinceEnabled: true, + readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 }, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + meter, + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_replica_stale_tail", + spanId: "s_replica_stale_tail", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + await engine.dequeueFromWorkerQueue({ + consumerId: "test_replica_stale_tail", + workerQueue: "main", + }); + + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + expect(allSnapshots.length).toBeGreaterThanOrEqual(3); + + const since = allSnapshots[0]; + const tail = allSnapshots[allSnapshots.length - 1]; + + // Replica has everything EXCEPT the newest snapshot - a lagging-but-usable replica. + await copySnapshotsToReplica(prisma, schemaOnlyPrisma, run.id, { + excludeSnapshotIds: [tail.id], + }); + + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: since.id, + }); + + expect(result).not.toBeNull(); + + // The replica's view: everything after the since snapshot, minus the tail + // it hasn't received yet. If reads were hitting the primary, the tail would + // be present and these assertions would fail. + const expectedSnapshots = allSnapshots.filter( + (s) => s.createdAt.getTime() > since.createdAt.getTime() && s.id !== tail.id + ); + expect(result!.map((s) => s.snapshot.id)).not.toContain(tail.id); + // Compare as sorted lists: same-millisecond snapshots have unspecified relative + // order in both the engine query and this test's query. + expect(result!.map((s) => s.snapshot.id).sort()).toEqual( + expectedSnapshots.map((s) => s.id).sort() + ); + + expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "reads from the primary when the flag is off even with a replica configured", + async ({ prisma, schemaOnlyPrisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const { meter, getCounterValue } = createTestMetricsMeter(); + + const engine = new RunEngine({ + prisma, + // Replica configured but EMPTY, flag off: a correct result can only come + // from the primary. + readOnlyPrisma: schemaOnlyPrisma, + readReplicaSnapshotsSinceEnabled: false, + readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 }, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + meter, + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_replica_flag_off", + spanId: "s_replica_flag_off", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + await engine.dequeueFromWorkerQueue({ + consumerId: "test_replica_flag_off", + workerQueue: "main", + }); + + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + expect(allSnapshots.length).toBeGreaterThan(1); + + const since = allSnapshots[0]; + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: since.id, + }); + + expect(result).not.toBeNull(); + const expectedSnapshots = allSnapshots.filter( + (s) => s.createdAt.getTime() > since.createdAt.getTime() + ); + expect(expectedSnapshots.length).toBeGreaterThan(0); + // Compare as sorted lists: same-millisecond snapshots have unspecified relative + // order in both the engine query and this test's query. + expect(result!.map((s) => s.snapshot.id).sort()).toEqual( + expectedSnapshots.map((s) => s.id).sort() + ); + + expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "uses the provided transaction client and never falls back", + async ({ prisma, schemaOnlyPrisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const { meter, getCounterValue } = createTestMetricsMeter(); + + const engine = new RunEngine({ + prisma, + // Flag ON with an EMPTY replica: if the provided tx didn't bypass the replica, + // the read would miss and (at best) be served by the fallback, incrementing + // the counter. + readOnlyPrisma: schemaOnlyPrisma, + readReplicaSnapshotsSinceEnabled: true, + readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 }, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + meter, + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_replica_tx_bypass", + spanId: "s_replica_tx_bypass", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + await engine.dequeueFromWorkerQueue({ + consumerId: "test_replica_tx_bypass", + workerQueue: "main", + }); + + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + expect(allSnapshots.length).toBeGreaterThan(1); + + const since = allSnapshots[0]; + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: since.id, + tx: prisma, + }); + + expect(result).not.toBeNull(); + const expectedSnapshots = allSnapshots.filter( + (s) => s.createdAt.getTime() > since.createdAt.getTime() + ); + expect(expectedSnapshots.length).toBeGreaterThan(0); + // Compare as sorted lists: same-millisecond snapshots have unspecified relative + // order in both the engine query and this test's query. + expect(result!.map((s) => s.snapshot.id).sort()).toEqual( + expectedSnapshots.map((s) => s.id).sort() + ); + + expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0); + } finally { + await engine.quit(); + } + } + ); }); diff --git a/internal-packages/run-engine/src/engine/tests/helpers/replicaTestHelpers.ts b/internal-packages/run-engine/src/engine/tests/helpers/replicaTestHelpers.ts new file mode 100644 index 00000000000..3b94bae8887 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/helpers/replicaTestHelpers.ts @@ -0,0 +1,105 @@ +import { + AggregationTemporality, + InMemoryMetricExporter, + MeterProvider, + PeriodicExportingMetricReader, +} from "@opentelemetry/sdk-metrics"; +import { Prisma, PrismaClient } from "@trigger.dev/database"; + +/** + * Copies a run's TaskRunExecutionSnapshot rows from the primary database into the + * replica database. The replica (a schema-only clone) has no parent rows (no TaskRun), + * so the rows are inserted with FK triggers disabled - exactly how a physical replica's + * data arrives, without FK re-checks. + */ +export async function copySnapshotsToReplica( + primary: PrismaClient, + replica: PrismaClient, + runId: string, + opts?: { excludeSnapshotIds?: string[] } +) { + const rows = await primary.taskRunExecutionSnapshot.findMany({ where: { runId } }); + const toCopy = rows.filter((r) => !(opts?.excludeSnapshotIds ?? []).includes(r.id)); + + await replica.$transaction(async (tx) => { + // SET LOCAL applies for the duration of this transaction (same connection), + // disabling FK triggers like a physical replica's apply process. + await tx.$executeRawUnsafe(`SET LOCAL session_replication_role = replica`); + + for (const row of toCopy) { + await tx.taskRunExecutionSnapshot.create({ + data: { + id: row.id, + engine: row.engine, + executionStatus: row.executionStatus, + description: row.description, + isValid: row.isValid, + error: row.error, + previousSnapshotId: row.previousSnapshotId, + runId: row.runId, + runStatus: row.runStatus, + batchId: row.batchId, + attemptNumber: row.attemptNumber, + environmentId: row.environmentId, + environmentType: row.environmentType, + projectId: row.projectId, + organizationId: row.organizationId, + completedWaitpointOrder: row.completedWaitpointOrder, + checkpointId: row.checkpointId, + workerId: row.workerId, + runnerId: row.runnerId, + // Preserve timestamps exactly - the snapshots-since window query depends on them. + createdAt: row.createdAt, + updatedAt: row.updatedAt, + lastHeartbeatAt: row.lastHeartbeatAt, + metadata: row.metadata === null ? Prisma.DbNull : row.metadata, + }, + }); + } + }); +} + +/** + * Creates a real OTel meter backed by an in-memory exporter, plus a helper to read + * a counter's current cumulative value. No mocks - this exercises the same metrics + * pipeline production uses. + */ +export function createTestMetricsMeter() { + const exporter = new InMemoryMetricExporter(AggregationTemporality.CUMULATIVE); + // Long interval: exports only happen via explicit forceFlush() below. + const reader = new PeriodicExportingMetricReader({ exporter, exportIntervalMillis: 3_600_000 }); + const meterProvider = new MeterProvider({ readers: [reader] }); + const meter = meterProvider.getMeter("test"); + + const getCounterValue = async ( + name: string, + attributes?: Record + ): Promise => { + await reader.forceFlush(); + const resourceMetrics = exporter.getMetrics(); + + // Cumulative temporality: every export batch carries the full running total, + // so read the most recent batch that contains the metric. A counter that was + // never added to exports no data points - treat that as 0. When `attributes` + // is provided, only data points whose attributes match are summed. + for (let i = resourceMetrics.length - 1; i >= 0; i--) { + for (const scopeMetrics of resourceMetrics[i].scopeMetrics) { + for (const metric of scopeMetrics.metrics) { + if (metric.descriptor.name === name && metric.dataPoints.length > 0) { + return metric.dataPoints + .filter( + (dp) => + !attributes || + Object.entries(attributes).every(([key, value]) => dp.attributes[key] === value) + ) + .reduce((sum, dp) => sum + (dp.value as number), 0); + } + } + } + } + + return 0; + }; + + return { meter, getCounterValue }; +} diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index e63b1c81f8b..0077478318b 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -207,6 +207,10 @@ export type RunEngineOptions = { * of the primary. Defaults to false. Callers passing an explicit `tx` always use * that client regardless of this flag. */ readReplicaSnapshotsSinceEnabled?: boolean; + /** Jittered delay bounds for the single replica retry `getSnapshotsSince` performs when + * the since snapshot is not yet on the replica, before falling back to the primary. + * Set maxMs to 0 (or any value <= 0) to skip the replica retry and go straight to the primary. */ + readReplicaSnapshotsSinceRetryDelay?: { minMs: number; maxMs: number }; tracer: Tracer; meter?: Meter; logger?: Logger; diff --git a/internal-packages/testcontainers/src/index.ts b/internal-packages/testcontainers/src/index.ts index 8b687402f6d..0047f996df9 100644 --- a/internal-packages/testcontainers/src/index.ts +++ b/internal-packages/testcontainers/src/index.ts @@ -181,11 +181,7 @@ const clonedPostgresContainer = async ({}, use: Use) const baseUri = container.getConnectionUri(); const cloneDb = `test_${pgCloneCounter++}`; - const admin = new PrismaClient({ - datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } }, - }); - await admin.$executeRawUnsafe(`CREATE DATABASE "${cloneDb}" TEMPLATE "${POSTGRES_TEMPLATE_DB}"`); - await admin.$disconnect(); + await createDatabaseFromTemplate(baseUri, cloneDb); const cloneUri = postgresUriWithDatabase(baseUri, cloneDb); const view = new Proxy(container, { @@ -200,19 +196,57 @@ const clonedPostgresContainer = async ({}, use: Use) try { await use(view); } finally { - // Best-effort drop so clones don't pile up in the worker's pg over a long suite. WITH (FORCE) - // terminates any lingering backends (pg 13+). A failed drop is harmless - the whole container is - // reaped on worker exit - so we never let cleanup fail the test. - const cleanup = new PrismaClient({ - datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } }, - }); - try { - await cleanup.$executeRawUnsafe(`DROP DATABASE IF EXISTS "${cloneDb}" WITH (FORCE)`); - } catch { - // ignore - reaped with the container anyway - } finally { - await cleanup.$disconnect(); - } + await dropCloneDatabase(baseUri, cloneDb); + } +}; + +const createDatabaseFromTemplate = async (baseUri: string, cloneDb: string) => { + const admin = new PrismaClient({ + datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } }, + }); + try { + await admin.$executeRawUnsafe( + `CREATE DATABASE "${cloneDb}" TEMPLATE "${POSTGRES_TEMPLATE_DB}"` + ); + } finally { + await admin.$disconnect(); + } +}; + +// Best-effort drop so clones don't pile up in the worker's pg over a long suite. WITH (FORCE) +// terminates any lingering backends (pg 13+). A failed drop is harmless - the whole container is +// reaped on worker exit - so we never let cleanup fail the test. +const dropCloneDatabase = async (baseUri: string, cloneDb: string) => { + const cleanup = new PrismaClient({ + datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } }, + }); + try { + await cleanup.$executeRawUnsafe(`DROP DATABASE IF EXISTS "${cloneDb}" WITH (FORCE)`); + } catch { + // ignore - reaped with the container anyway + } finally { + await cleanup.$disconnect(); + } +}; + +// A second migrated-but-empty database on the same worker postgres, cloned from the schema +// template. For tests that need to simulate a read replica that hasn't caught up: schema +// present, rows absent. Lazy - only booted when a test destructures it. +const schemaOnlyPrismaFixture = async ({}: {}, use: Use) => { + const container = await getWorkerPostgresContainer(); + const baseUri = container.getConnectionUri(); + const cloneDb = `schema_only_${pgCloneCounter++}`; + + await createDatabaseFromTemplate(baseUri, cloneDb); + + const prisma = new PrismaClient({ + datasources: { db: { url: postgresUriWithDatabase(baseUri, cloneDb) } }, + }); + try { + await use(prisma); + } finally { + await logCleanup("schemaOnlyPrisma", prisma.$disconnect()); + await dropCloneDatabase(baseUri, cloneDb); } }; @@ -454,6 +488,7 @@ export const postgresAndRedisTest = test.extend({ type ContainerTestContext = { postgresContainer: StartedPostgreSqlContainer; prisma: PrismaClient; + schemaOnlyPrisma: PrismaClient; redisContainer: StartedRedisContainer; resetRedis: void; redisOptions: RedisOptions; @@ -468,6 +503,7 @@ type ContainerTestContext = { export const containerTest = test.extend({ postgresContainer: clonedPostgresContainer, prisma: prismaFromContainer, + schemaOnlyPrisma: schemaOnlyPrismaFixture, redisContainer: [bootWorkerRedis, { scope: "worker" }], resetRedis: [flushRedis, { auto: true }], redisOptions, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 782b62cf7ff..807ae796582 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1329,6 +1329,9 @@ importers: '@internal/testcontainers': specifier: workspace:* version: link:../testcontainers + '@opentelemetry/sdk-metrics': + specifier: 2.7.1 + version: 2.7.1(@opentelemetry/api@1.9.1) '@types/seedrandom': specifier: ^3.0.8 version: 3.0.8