Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/dequeue-latency-histogram.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Record client-side dequeue API latency in the supervisor consumer pool as a Prometheus histogram (`queue_consumer_pool_dequeue_duration_seconds`, labelled by `outcome`: success/empty/error).
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
import { SupervisorHttpClient } from "./http.js";
import type { WorkerApiDequeueResponseBody } from "./schemas.js";
import type { QueueConsumer } from "./queueConsumer.js";
import { ConsumerPoolMetrics } from "./consumerPoolMetrics.js";
import { Registry } from "prom-client";

// Mock only the logger
vi.mock("../../utils/structuredLogger.js");
Expand All @@ -16,9 +18,11 @@ class TestQueueConsumer implements QueueConsumer {
public started = false;
public stopped = false;
public onDequeue?: (messages: WorkerApiDequeueResponseBody) => Promise<void>;
public metrics?: ConsumerPoolMetrics;

constructor(opts: any) {
this.onDequeue = opts.onDequeue;
this.metrics = opts.metrics;
}

start(): void {
Expand Down Expand Up @@ -719,6 +723,38 @@ describe("RunQueueConsumerPool", () => {
});
});

describe("Metrics wiring", () => {
it("injects the pool's shared ConsumerPoolMetrics into every consumer when a registry is provided", async () => {
pool = new RunQueueConsumerPool({
...defaultOptions,
metricsRegistry: new Registry(),
scaling: { strategy: "none", maxConsumerCount: 3 },
});

await pool.start();

expect(testConsumers.length).toBe(3);
const poolMetrics = pool["promMetrics"];
expect(poolMetrics).toBeInstanceOf(ConsumerPoolMetrics);
testConsumers.forEach((consumer) => {
expect(consumer.metrics).toBe(poolMetrics);
});
});

it("preserves a caller-supplied consumer metrics instance when no registry is provided", async () => {
const callerMetrics = new ConsumerPoolMetrics({ register: new Registry() });
pool = new RunQueueConsumerPool({
...defaultOptions,
consumer: { ...defaultOptions.consumer, metrics: callerMetrics },
scaling: { strategy: "none", maxConsumerCount: 1 },
});

await pool.start();

expect(testConsumers[0]?.metrics).toBe(callerMetrics);
});
});

describe("Backpressure scale-up freeze", () => {
it("freezes scale-up while shouldPauseScaling returns true, then resumes", async () => {
let paused = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ export class RunQueueConsumerPool {

const consumer = this.consumerFactory({
...this.consumerOptions,
// Share the pool's single metrics instance so every consumer records onto
// the same histogram (re-registering the metric name would throw). Fall
// back to a caller-supplied instance rather than clobbering it.
metrics: this.promMetrics ?? this.consumerOptions.metrics,
onDequeue: async (messages, timing) => {
// Always update queue length, default to 0 for empty dequeues or missing value
this.updateQueueLength(messages[0]?.workerQueueLength ?? 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ export interface ConsumerPoolMetricsOptions {
prefix?: string;
}

/**
* Outcome of a single dequeue API round-trip, used as a low-cardinality label
* on the dequeue latency histogram.
* - `success`: the call returned at least one run
* - `empty`: the call succeeded but returned no runs (the common idle case)
* - `error`: the call failed (unsuccessful response, network error, or timeout)
*/
export type DequeueOutcome = "success" | "empty" | "error";

export class ConsumerPoolMetrics {
private readonly register: Registry;
private readonly prefix: string;
Expand All @@ -26,6 +35,9 @@ export class ConsumerPoolMetrics {
public readonly queueLengthUpdatesTotal: Counter;
public readonly batchesProcessedTotal: Counter;

// Dequeue API latency (client-side, measured around the dequeue HTTP call)
public readonly dequeueDurationSeconds: Histogram;

constructor(opts: ConsumerPoolMetricsOptions = {}) {
this.register = opts.register ?? new Registry();
this.prefix = opts.prefix ?? "queue_consumer_pool";
Expand Down Expand Up @@ -102,6 +114,23 @@ export class ConsumerPoolMetrics {
help: "Total number of metric batches processed",
registers: [this.register],
});

this.dequeueDurationSeconds = new Histogram({
name: `${this.prefix}_dequeue_duration_seconds`,
help: "Client-side duration of the dequeue API call (POST /engine/v1/worker-actions/dequeue), including the HTTP client's internal retries and backoff",
labelNames: ["outcome"],
// The HTTP client retries internally (up to 5 attempts with 0.5-5s backoff),
// so one observation can span multiple requests plus sleeps. A retryable
// failure surfaces as `error` only after >=7.5s of backoff - the 10-30s
// buckets exist so that mode doesn't collapse into +Inf. The server also
// long-polls (RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS, default 10s),
// parking empty dequeues at ~10s - the 11/12.5/15/20 buckets give the
// quantiles resolution just above that boundary, where the mass sits.
// 60s brackets the worst-case error envelope (5 attempts that each hit
// the ~10s hold, plus backoff); beyond that the connection is hung.
buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 11, 12.5, 15, 20, 30, 60],
registers: [this.register],
});
}

/**
Expand Down Expand Up @@ -157,4 +186,13 @@ export class ConsumerPoolMetrics {
recordQueueLengthUpdate() {
this.queueLengthUpdatesTotal.inc();
}

/**
* Record the client-side latency of a single dequeue API round-trip.
* @param seconds Wall-clock duration of the dequeue call, in seconds.
* @param outcome Whether the call returned runs, was empty, or errored.
*/
observeDequeueLatency(seconds: number, outcome: DequeueOutcome) {
this.dequeueDurationSeconds.observe({ outcome }, seconds);
}
}
104 changes: 104 additions & 0 deletions packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { Registry } from "prom-client";
import { RunQueueConsumer } from "./queueConsumer.js";
import { ConsumerPoolMetrics } from "./consumerPoolMetrics.js";
import type { SupervisorHttpClient } from "./http.js";
import type { WorkerApiDequeueResponseBody } from "./schemas.js";

// Mock only the logger (same approach as consumerPool.test.ts)
vi.mock("../../utils/structuredLogger.js");

function makeClient(dequeueImpl: () => Promise<unknown>): SupervisorHttpClient {
return { dequeue: vi.fn(dequeueImpl) } as unknown as SupervisorHttpClient;
}

describe("RunQueueConsumer dequeue latency metric", () => {
let register: Registry;
let metrics: ConsumerPoolMetrics;
let consumer: RunQueueConsumer | undefined;

beforeEach(() => {
vi.clearAllMocks();
// Fake timers so the trailing scheduleNextDequeue() never fires during the test.
vi.useFakeTimers();
register = new Registry();
metrics = new ConsumerPoolMetrics({ register });
});

afterEach(() => {
consumer?.stop();
vi.clearAllTimers();
vi.useRealTimers();
});

/**
* Runs exactly one dequeue iteration and awaits it. We set `isEnabled`
* directly and invoke the private `dequeue()` rather than `start()`, so no
* timer-driven loop runs - the metric is recorded before scheduleNextDequeue().
*/
async function runOneDequeue(opts: {
dequeueImpl: () => Promise<unknown>;
withMetrics?: boolean;
}) {
consumer = new RunQueueConsumer({
client: makeClient(opts.dequeueImpl),
intervalMs: 600_000,
idleIntervalMs: 600_000,
onDequeue: async () => {},
...(opts.withMetrics === false ? {} : { metrics }),
});

(consumer as unknown as { isEnabled: boolean }).isEnabled = true;
await (consumer as unknown as { dequeue(): Promise<void> }).dequeue();
}

it('records outcome="empty" for a successful empty dequeue', async () => {
await runOneDequeue({ dequeueImpl: async () => ({ success: true, data: [] }) });

expect(await register.metrics()).toContain(
'queue_consumer_pool_dequeue_duration_seconds_count{outcome="empty"} 1'
);
});

it('records outcome="success" once per round-trip, regardless of message count', async () => {
const messages = [{ run: {} }, { run: {} }] as unknown as WorkerApiDequeueResponseBody;
await runOneDequeue({ dequeueImpl: async () => ({ success: true, data: messages }) });

const text = await register.metrics();
// One observation for the whole batch, not one per message.
expect(text).toContain('queue_consumer_pool_dequeue_duration_seconds_count{outcome="success"} 1');
});

it('records outcome="error" when the response is unsuccessful', async () => {
await runOneDequeue({
dequeueImpl: async () => ({ success: false, error: new Error("boom") }),
});

expect(await register.metrics()).toContain(
'queue_consumer_pool_dequeue_duration_seconds_count{outcome="error"} 1'
);
});

// Defensive path: wrapZodFetch traps all errors today, so the real client
// never throws - this guards against a future client that does.
it('records outcome="error" when the dequeue call throws', async () => {
await runOneDequeue({
dequeueImpl: async () => {
throw new Error("network down");
},
});

expect(await register.metrics()).toContain(
'queue_consumer_pool_dequeue_duration_seconds_count{outcome="error"} 1'
);
});

it("is a no-op (does not throw) when no metrics instance is provided", async () => {
await expect(
runOneDequeue({ dequeueImpl: async () => ({ success: true, data: [] }), withMetrics: false })
).resolves.not.toThrow();

// Histogram has no observations - the labelled count line should be absent.
expect(await register.metrics()).not.toContain("queue_consumer_pool_dequeue_duration_seconds_count");
});
});
21 changes: 19 additions & 2 deletions packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { SimpleStructuredLogger } from "../../utils/structuredLogger.js";
import { SupervisorHttpClient } from "./http.js";
import { WorkerApiDequeueResponseBody, WorkerQueueClass } from "./schemas.js";
import { PreDequeueFn, PreSkipFn } from "./types.js";
import type { ConsumerPoolMetrics } from "./consumerPoolMetrics.js";

export interface QueueConsumer {
start(): void;
Expand All @@ -18,6 +19,8 @@ export type RunQueueConsumerOptions = {
/** Which worker-queue class this consumer pulls from. Defaults to the worker's region queue. */
queueClass?: WorkerQueueClass;
onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise<void>;
/** Optional shared pool metrics. When provided, dequeue API latency is recorded as a histogram. */
metrics?: ConsumerPoolMetrics;
};

export class RunQueueConsumer implements QueueConsumer {
Expand All @@ -27,6 +30,7 @@ export class RunQueueConsumer implements QueueConsumer {
private readonly maxRunCount?: number;
private readonly queueClass?: WorkerQueueClass;
private readonly onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise<void>;
private readonly metrics?: ConsumerPoolMetrics;

private readonly logger = new SimpleStructuredLogger("queue-consumer");

Expand All @@ -46,6 +50,7 @@ export class RunQueueConsumer implements QueueConsumer {
this.lastScheduledIntervalMs = opts.idleIntervalMs;
this.onDequeue = opts.onDequeue;
this.client = opts.client;
this.metrics = opts.metrics;
}

start() {
Expand Down Expand Up @@ -116,18 +121,26 @@ export class RunQueueConsumer implements QueueConsumer {

let nextIntervalMs = this.idleIntervalMs;

const dequeueStart = performance.now();

try {
const dequeueStart = performance.now();
const response = await this.client.dequeue({
maxResources: preDequeueResult?.maxResources,
maxRunCount: this.maxRunCount,
queueClass: this.queueClass,
});
const dequeueResponseMs = Math.round(performance.now() - dequeueStart);
const dequeueDurationSeconds = (performance.now() - dequeueStart) / 1000;
const dequeueResponseMs = Math.round(dequeueDurationSeconds * 1000);

if (!response.success) {
this.metrics?.observeDequeueLatency(dequeueDurationSeconds, "error");
this.logger.error("Failed to dequeue", { error: response.error });
} else {
this.metrics?.observeDequeueLatency(
dequeueDurationSeconds,
response.data.length > 0 ? "success" : "empty"
);

try {
await this.onDequeue(response.data, { dequeueResponseMs, pollingIntervalMs: this.lastScheduledIntervalMs });

Expand All @@ -139,6 +152,10 @@ export class RunQueueConsumer implements QueueConsumer {
}
}
} catch (clientError) {
// wrapZodFetch traps all errors into { success: false }, so this branch is
// unreachable with the real client today. Record defensively so a future
// client that throws can't silently lose error samples.
this.metrics?.observeDequeueLatency((performance.now() - dequeueStart) / 1000, "error");
this.logger.error("client.dequeue error", { error: clientError });
}

Expand Down
Loading