From 64a80b423fb0b958ef764e0c3cc2becdf179f8bb Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 9 Jun 2026 16:17:27 -0700 Subject: [PATCH 1/4] feat(metrics): emit workflow execution and per-block metrics to CloudWatch --- .../cron/cleanup-stale-executions/route.ts | 9 + apps/sim/executor/execution/block-executor.ts | 18 ++ .../logs/execution/logging-session.test.ts | 129 +++++++++++++ .../sim/lib/logs/execution/logging-session.ts | 25 +++ apps/sim/lib/monitoring/metrics.test.ts | 150 +++++++++++++++ apps/sim/lib/monitoring/metrics.ts | 177 +++++++++++++----- 6 files changed, 463 insertions(+), 45 deletions(-) create mode 100644 apps/sim/lib/monitoring/metrics.test.ts diff --git a/apps/sim/app/api/cron/cleanup-stale-executions/route.ts b/apps/sim/app/api/cron/cleanup-stale-executions/route.ts index 99c395d644b..a2767ba097d 100644 --- a/apps/sim/app/api/cron/cleanup-stale-executions/route.ts +++ b/apps/sim/app/api/cron/cleanup-stale-executions/route.ts @@ -8,6 +8,7 @@ import { verifyCronAuth } from '@/lib/auth/internal' import { JOB_RETENTION_HOURS, JOB_STATUS } from '@/lib/core/async-jobs' import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { workflowMetrics } from '@/lib/monitoring/metrics' const logger = createLogger('CleanupStaleExecutions') @@ -32,6 +33,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => { executionId: workflowExecutionLogs.executionId, workflowId: workflowExecutionLogs.workflowId, startedAt: workflowExecutionLogs.startedAt, + trigger: workflowExecutionLogs.trigger, }) .from(workflowExecutionLogs) .where( @@ -72,6 +74,13 @@ export const GET = withRouteHandler(async (request: NextRequest) => { staleDurationMinutes, }) + // Crashed workers never reach a LoggingSession completion path, so this + // is the only place these failures can be counted toward the error rate. + workflowMetrics.recordExecutionCompleted({ + trigger: execution.trigger, + status: 'failed', + }) + cleaned++ } catch (error) { logger.error(`Failed to clean up execution ${execution.executionId}:`, { diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 0fdd6be1530..a4d28c6e3da 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -4,6 +4,7 @@ import { redactApiKeys } from '@/lib/core/security/redaction' import { normalizeStringArray } from '@/lib/core/utils/arrays' import { getBaseUrl } from '@/lib/core/utils/urls' import { compactExecutionPayload } from '@/lib/execution/payloads/serializer' +import { workflowMetrics } from '@/lib/monitoring/metrics' import { containsUserFileWithMetadata, hydrateUserFilesWithBase64, @@ -239,6 +240,7 @@ export class BlockExecutor { if (normalizedOutput.childTraceSpans && Array.isArray(normalizedOutput.childTraceSpans)) { blockLog.childTraceSpans = normalizedOutput.childTraceSpans } + this.recordBlockMetric(block, true, duration) } const { childTraceSpans: _traces, ...outputForState } = normalizedOutput @@ -284,6 +286,21 @@ export class BlockExecutor { } } + private recordBlockMetric(block: SerializedBlock, success: boolean, durationMs: number): void { + const operation = block.config?.params?.operation + workflowMetrics.recordBlockExecuted({ + blockType: block.metadata?.id || 'unknown', + // Operation is user-configured; only emit registry-style identifiers so + // dynamic values can't explode CloudWatch dimension cardinality. + operation: + typeof operation === 'string' && /^[a-zA-Z0-9_-]{1,64}$/.test(operation) + ? operation + : undefined, + success, + durationMs, + }) + } + private buildNodeMetadata(node: DAGNode): WorkflowNodeMetadata { const metadata = node?.metadata ?? {} return { @@ -371,6 +388,7 @@ export class BlockExecutor { if (ChildWorkflowError.isChildWorkflowError(error) && error.childTraceSpans.length > 0) { blockLog.childTraceSpans = error.childTraceSpans } + this.recordBlockMetric(block, false, duration) } this.execLogger.error( diff --git a/apps/sim/lib/logs/execution/logging-session.test.ts b/apps/sim/lib/logs/execution/logging-session.test.ts index 7ad138d0f93..be9d15c5702 100644 --- a/apps/sim/lib/logs/execution/logging-session.test.ts +++ b/apps/sim/lib/logs/execution/logging-session.test.ts @@ -39,10 +39,24 @@ const { completeWorkflowExecutionMock, startWorkflowExecutionMock, loadWorkflowStateForExecutionMock, + recordExecutionStartedMock, + recordExecutionCompletedMock, + recordExecutionPausedMock, } = vi.hoisted(() => ({ completeWorkflowExecutionMock: vi.fn(), startWorkflowExecutionMock: vi.fn(), loadWorkflowStateForExecutionMock: vi.fn(), + recordExecutionStartedMock: vi.fn(), + recordExecutionCompletedMock: vi.fn(), + recordExecutionPausedMock: vi.fn(), +})) + +vi.mock('@/lib/monitoring/metrics', () => ({ + workflowMetrics: { + recordExecutionStarted: recordExecutionStartedMock, + recordExecutionCompleted: recordExecutionCompletedMock, + recordExecutionPaused: recordExecutionPausedMock, + }, })) vi.mock('@sim/db', () => ({ @@ -648,3 +662,118 @@ describe('LoggingSession.markExecutionAsFailed workflowId scoping', () => { expect(combined).toContain('force_failed') }) }) + +describe('LoggingSession workflow metrics', () => { + beforeEach(() => { + vi.clearAllMocks() + startWorkflowExecutionMock.mockResolvedValue({}) + completeWorkflowExecutionMock.mockResolvedValue({}) + loadWorkflowStateForExecutionMock.mockResolvedValue({ + blocks: {}, + edges: [], + loops: {}, + parallels: {}, + }) + dbMocks.selectLimit.mockResolvedValue([{ status: 'running' }]) + dbMocks.execute.mockResolvedValue(undefined) + }) + + it('emits ExecutionStarted on start and not on resume', async () => { + const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1') + await session.start({ workspaceId: 'ws-1' }) + expect(recordExecutionStartedMock).toHaveBeenCalledTimes(1) + expect(recordExecutionStartedMock).toHaveBeenCalledWith({ trigger: 'api' }) + + recordExecutionStartedMock.mockClear() + const resumeSession = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1') + await resumeSession.start({ workspaceId: 'ws-1', skipLogCreation: true }) + expect(recordExecutionStartedMock).not.toHaveBeenCalled() + }) + + it('emits a success completion with trigger and duration', async () => { + const session = new LoggingSession('wf-1', 'exec-1', 'webhook', 'req-1') + await session.complete({ totalDurationMs: 500 }) + + expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1) + expect(recordExecutionCompletedMock).toHaveBeenCalledWith({ + trigger: 'webhook', + status: 'success', + durationMs: 500, + }) + }) + + it('emits a failed completion via completeWithError', async () => { + const session = new LoggingSession('wf-1', 'exec-1', 'schedule', 'req-1') + await session.completeWithError({ totalDurationMs: 250, error: { message: 'boom' } }) + + expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1) + expect(recordExecutionCompletedMock).toHaveBeenCalledWith({ + trigger: 'schedule', + status: 'failed', + durationMs: 250, + }) + }) + + it('emits a cancelled completion via completeWithCancellation', async () => { + const session = new LoggingSession('wf-1', 'exec-1', 'manual', 'req-1') + await session.completeWithCancellation({ totalDurationMs: 100 }) + + expect(recordExecutionCompletedMock).toHaveBeenCalledWith({ + trigger: 'manual', + status: 'cancelled', + durationMs: 100, + }) + }) + + it('emits ExecutionPaused (not a completion) on pause, then failed if markAsFailed follows', async () => { + const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1') + await session.completeWithPause({ totalDurationMs: 100 }) + + expect(recordExecutionPausedMock).toHaveBeenCalledTimes(1) + expect(recordExecutionPausedMock).toHaveBeenCalledWith({ trigger: 'api' }) + expect(recordExecutionCompletedMock).not.toHaveBeenCalled() + + await session.markAsFailed('pause persistence failed') + expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1) + expect(recordExecutionCompletedMock).toHaveBeenCalledWith({ + trigger: 'api', + status: 'failed', + durationMs: undefined, + }) + }) + + it('does not double-emit when markAsFailed runs after a completed session', async () => { + const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1') + await session.complete({ totalDurationMs: 500 }) + await session.markAsFailed('timeout') + + expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1) + expect(recordExecutionCompletedMock).toHaveBeenCalledWith({ + trigger: 'api', + status: 'success', + durationMs: 500, + }) + }) + + it('emits exactly one completion when the primary write fails and the fallback succeeds', async () => { + const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1') + completeWorkflowExecutionMock + .mockRejectedValueOnce(new Error('finalize failed')) + .mockResolvedValueOnce({}) + + await session.safeCompleteWithError({ error: { message: 'boom' } }) + + expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1) + expect(recordExecutionCompletedMock).toHaveBeenCalledWith( + expect.objectContaining({ trigger: 'api', status: 'failed' }) + ) + }) + + it('skips the completion metric when the run was already cancelled elsewhere', async () => { + dbMocks.selectLimit.mockResolvedValue([{ status: 'cancelled' }]) + const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1') + await session.completeWithError({ error: { message: 'boom' } }) + + expect(recordExecutionCompletedMock).not.toHaveBeenCalled() + }) +}) diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index a0fd011dc7d..009fdd2d4e9 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -21,6 +21,7 @@ import type { TraceSpan, WorkflowState, } from '@/lib/logs/types' +import { type WorkflowExecutionStatus, workflowMetrics } from '@/lib/monitoring/metrics' import type { SerializableExecutionState } from '@/executor/execution/types' type TriggerData = Record & { @@ -137,6 +138,8 @@ export class LoggingSession { private completionAttemptFailed = false private pendingProgressWrites = new Set>() private postExecutionPromise: Promise | null = null + /** Guards against double-counting ExecutionCompleted across completion paths */ + private completionMetricEmitted = false constructor( workflowId: string, @@ -218,6 +221,12 @@ export class LoggingSession { } } + private emitExecutionCompletedMetric(status: WorkflowExecutionStatus, durationMs?: number): void { + if (this.completionMetricEmitted) return + this.completionMetricEmitted = true + workflowMetrics.recordExecutionCompleted({ trigger: this.triggerType, status, durationMs }) + } + private async completeExecutionWithFinalization(params: { endedAt: string totalDurationMs: number @@ -325,6 +334,7 @@ export class LoggingSession { workflowState: this.workflowState, deploymentVersionId, }) + workflowMetrics.recordExecutionStarted({ trigger: this.triggerType }) } else { // Resume: no cost reload needed. Billing reconciles from the usage_log // ledger (pre-pause rows already exist) plus the live cost summary. @@ -364,6 +374,7 @@ export class LoggingSession { }) this.completed = true + this.emitExecutionCompletedMetric('success', duration) if (traceSpans && traceSpans.length > 0) { try { @@ -500,6 +511,7 @@ export class LoggingSession { }) this.completed = true + this.emitExecutionCompletedMetric('failed', Math.max(1, durationMs)) try { const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import( @@ -593,6 +605,7 @@ export class LoggingSession { }) this.completed = true + this.emitExecutionCompletedMetric('cancelled', Math.max(1, durationMs)) try { const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import( @@ -688,6 +701,7 @@ export class LoggingSession { }) this.completed = true + workflowMetrics.recordExecutionPaused({ trigger: this.triggerType }) try { const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import( @@ -779,6 +793,7 @@ export class LoggingSession { workflowState: this.workflowState, deploymentVersionId, }) + workflowMetrics.recordExecutionStarted({ trigger: this.triggerType }) if (this.requestId) { logger.debug( @@ -969,6 +984,7 @@ export class LoggingSession { this.requestId, this.workflowId ) + this.emitExecutionCompletedMetric('failed') } static async markExecutionAsFailed( @@ -1056,6 +1072,15 @@ export class LoggingSession { this.completed = true + if (params.status === 'pending') { + workflowMetrics.recordExecutionPaused({ trigger: this.triggerType }) + } else { + this.emitExecutionCompletedMetric( + params.status === 'failed' || params.status === 'cancelled' ? params.status : 'success', + params.totalDurationMs || 0 + ) + } + logger.info( `[${this.requestId || 'unknown'}] Cost-only fallback succeeded for execution ${this.executionId}` ) diff --git a/apps/sim/lib/monitoring/metrics.test.ts b/apps/sim/lib/monitoring/metrics.test.ts new file mode 100644 index 00000000000..115e4b63200 --- /dev/null +++ b/apps/sim/lib/monitoring/metrics.test.ts @@ -0,0 +1,150 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockSend } = vi.hoisted(() => { + process.env.AWS_ACCESS_KEY_ID = 'test-access-key' + return { mockSend: vi.fn() } +}) + +vi.mock('@aws-sdk/client-cloudwatch', () => { + class MockCloudWatchClient { + send = mockSend + } + class MockPutMetricDataCommand { + input: unknown + constructor(input: unknown) { + this.input = input + } + } + return { + CloudWatchClient: MockCloudWatchClient, + PutMetricDataCommand: MockPutMetricDataCommand, + StandardUnit: { Count: 'Count', Milliseconds: 'Milliseconds', None: 'None' }, + } +}) + +import { flushMetrics, hostedKeyMetrics, workflowMetrics } from '@/lib/monitoring/metrics' + +interface SentCommand { + input: { Namespace: string; MetricData: Array> } +} + +function sentCommands(): SentCommand['input'][] { + return mockSend.mock.calls.map(([cmd]) => (cmd as SentCommand).input) +} + +function findDatum(namespace: string, metricName: string) { + const batch = sentCommands().find((c) => c.Namespace === namespace) + return batch?.MetricData.find((d) => d.MetricName === metricName) +} + +describe('CloudWatch metrics emitter', () => { + beforeEach(async () => { + await flushMetrics() + vi.clearAllMocks() + mockSend.mockResolvedValue({}) + }) + + it('groups buffered points into one PutMetricData call per namespace', async () => { + hostedKeyMetrics.recordUsed({ provider: 'openai', tool: 'gpt', key: 'OPENAI_API_KEY' }) + workflowMetrics.recordExecutionStarted({ trigger: 'api' }) + + await flushMetrics() + + const namespaces = sentCommands().map((c) => c.Namespace) + expect(namespaces).toHaveLength(2) + expect(namespaces).toContain('Sim/HostedKey') + expect(namespaces).toContain('Sim/Workflow') + }) + + it('drains the buffer so a second flush sends nothing', async () => { + workflowMetrics.recordExecutionStarted({ trigger: 'manual' }) + + await flushMetrics() + await flushMetrics() + + expect(mockSend).toHaveBeenCalledTimes(1) + }) + + it('emits ExecutionCompleted with Trigger and Status dimensions plus duration', async () => { + workflowMetrics.recordExecutionCompleted({ + trigger: 'webhook', + status: 'failed', + durationMs: 1234, + }) + + await flushMetrics() + + const completed = findDatum('Sim/Workflow', 'ExecutionCompleted') + expect(completed).toMatchObject({ Value: 1, Unit: 'Count' }) + expect(completed?.Dimensions).toEqual( + expect.arrayContaining([ + expect.objectContaining({ Name: 'Environment' }), + { Name: 'Trigger', Value: 'webhook' }, + { Name: 'Status', Value: 'failed' }, + ]) + ) + + const duration = findDatum('Sim/Workflow', 'ExecutionDuration') + expect(duration).toMatchObject({ Value: 1234, Unit: 'Milliseconds' }) + }) + + it('skips ExecutionDuration when durationMs is unknown', async () => { + workflowMetrics.recordExecutionCompleted({ trigger: 'schedule', status: 'failed' }) + + await flushMetrics() + + expect(findDatum('Sim/Workflow', 'ExecutionCompleted')).toBeDefined() + expect(findDatum('Sim/Workflow', 'ExecutionDuration')).toBeUndefined() + }) + + it('emits BlockExecuted with BlockType/Operation/Success and BlockDuration without Operation', async () => { + workflowMetrics.recordBlockExecuted({ + blockType: 'cloudwatch', + operation: 'put_metric_data', + success: false, + durationMs: 42, + }) + + await flushMetrics() + + const executed = findDatum('Sim/Workflow', 'BlockExecuted') + expect(executed?.Dimensions).toEqual( + expect.arrayContaining([ + { Name: 'BlockType', Value: 'cloudwatch' }, + { Name: 'Operation', Value: 'put_metric_data' }, + { Name: 'Success', Value: 'false' }, + ]) + ) + + const duration = findDatum('Sim/Workflow', 'BlockDuration') + expect(duration).toMatchObject({ Value: 42, Unit: 'Milliseconds' }) + expect(duration?.Dimensions).toEqual([ + expect.objectContaining({ Name: 'Environment' }), + { Name: 'BlockType', Value: 'cloudwatch' }, + ]) + }) + + it('omits the Operation dimension when not provided', async () => { + workflowMetrics.recordBlockExecuted({ blockType: 'agent', success: true, durationMs: 5 }) + + await flushMetrics() + + const executed = findDatum('Sim/Workflow', 'BlockExecuted') + expect(executed?.Dimensions?.map((d: { Name: string }) => d.Name)).not.toContain('Operation') + }) + + it('drops the batch instead of throwing when PutMetricData fails', async () => { + mockSend.mockRejectedValueOnce(new Error('cloudwatch down')) + workflowMetrics.recordExecutionStarted({ trigger: 'chat' }) + + await expect(flushMetrics()).resolves.toBeUndefined() + + vi.clearAllMocks() + mockSend.mockResolvedValue({}) + await flushMetrics() + expect(mockSend).not.toHaveBeenCalled() + }) +}) diff --git a/apps/sim/lib/monitoring/metrics.ts b/apps/sim/lib/monitoring/metrics.ts index 486ba57175e..6ca101b95a3 100644 --- a/apps/sim/lib/monitoring/metrics.ts +++ b/apps/sim/lib/monitoring/metrics.ts @@ -1,25 +1,26 @@ /** - * Hosted-key metrics → CloudWatch. + * Application metrics → CloudWatch. * - * Emitted to CloudWatch (not OTel/Prometheus) because hosted-key work runs in - * both the long-lived web app and ephemeral trigger.dev workers. CloudWatch + * Emitted to CloudWatch (not OTel/Prometheus) because this work runs in both + * the long-lived web app and ephemeral trigger.dev workers. CloudWatch * aggregates pushed values server-side (additively), so one-shot worker * processes don't break aggregation the way cumulative Prometheus counters do * (no per-process series collisions, no counter-reset math, no delta plumbing). * - * Dimensions stay low-cardinality (Provider, Tool, Key, Reason, Environment) — - * CloudWatch bills per unique dimension combination. `Key` is the env-var NAME + * Dimensions stay low-cardinality — CloudWatch bills per unique dimension + * combination. Hosted-key metrics use Provider/Tool/Key/Reason; workflow + * metrics use Trigger/Status/BlockType/Operation. `Key` is the env-var NAME * of the chosen hosted key (e.g. `PERPLEXITY_API_KEY_2`), never the secret. - * Per-workspace/user cost lives in the `usage_log` table, never on a dimension. + * Per-workspace/user breakdowns live in the database, never on a dimension. * - * Records buffer in-process and flush asynchronously via PutMetricData (batched, - * off the request path). Flushing is automatic — a 5s timer, a buffer-size - * threshold, and SIGTERM/SIGINT/beforeExit (the exit handlers AWAIT the final - * drain, so both long-lived app processes and ephemeral trigger.dev workers push - * their last batch before the process exits). flushHostedKeyMetrics() is also - * exported for explicit/early draining (e.g. tests). The buffer is hard-capped: - * if CloudWatch flushing stalls it drops the oldest points rather than growing - * unbounded. + * Records buffer in-process and flush asynchronously via PutMetricData (batched + * per namespace, off the request path). Flushing is automatic — a 5s timer, a + * buffer-size threshold, and SIGTERM/SIGINT/beforeExit (the exit handlers AWAIT + * the final drain, so both long-lived app processes and ephemeral trigger.dev + * workers push their last batch before the process exits). flushMetrics() is + * also exported for explicit/early draining (e.g. tests). The buffer is + * hard-capped: if CloudWatch flushing stalls it drops the oldest points rather + * than growing unbounded. */ import { @@ -30,9 +31,10 @@ import { } from '@aws-sdk/client-cloudwatch' import { createLogger } from '@sim/logger' -const logger = createLogger('HostedKeyMetrics') +const logger = createLogger('CloudWatchMetrics') -const NAMESPACE = 'Sim/HostedKey' +const HOSTED_KEY_NAMESPACE = 'Sim/HostedKey' +const WORKFLOW_NAMESPACE = 'Sim/Workflow' const MAX_BATCH = 1000 // CloudWatch PutMetricData hard limit per request const FLUSH_INTERVAL_MS = 5_000 const FLUSH_THRESHOLD = 1000 // flush once the buffer reaches this many points @@ -42,19 +44,31 @@ type ThrottleReason = 'billing_actor_limit' | 'upstream_retries_exhausted' type QueueReason = 'actor_requests' | 'dimension' | 'queue_position' type FailureReason = 'rate_limited' | 'auth' | 'other' +export type WorkflowExecutionStatus = 'success' | 'failed' | 'cancelled' + // Deployed envs (app + trigger worker) carry static AWS creds; local dev does // not. No creds → no-op, so recorders stay always-safe to call (same contract // as the previous no-op-meter behavior). const ENABLED = Boolean(process.env.AWS_ACCESS_KEY_ID) +// GRAFANA_DEPLOYMENT_ENVIRONMENT is the per-environment label already set for +// trigger.dev telemetry — without it (or one of the OTEL_/DEPLOYMENT_ vars), +// the NODE_ENV fallback collapses staging into 'production' since staging +// builds also run with NODE_ENV=production. const ENVIRONMENT = process.env.OTEL_DEPLOYMENT_ENVIRONMENT || process.env.DEPLOYMENT_ENVIRONMENT || + process.env.GRAFANA_DEPLOYMENT_ENVIRONMENT || process.env.NODE_ENV || 'development' +interface BufferedDatum { + namespace: string + datum: MetricDatum +} + let client: CloudWatchClient | undefined -let buffer: MetricDatum[] = [] +let buffer: BufferedDatum[] = [] let dropped = 0 let timer: ReturnType | undefined let handlersRegistered = false @@ -69,13 +83,13 @@ function getClient(): CloudWatchClient { function ensureBackground(): void { if (timer) return timer = setInterval(() => { - void flushHostedKeyMetrics() + void flushMetrics() }, FLUSH_INTERVAL_MS) timer.unref?.() if (!handlersRegistered) { handlersRegistered = true const onExit = async () => { - await flushHostedKeyMetrics() + await flushMetrics() } process.once('SIGTERM', onExit) process.once('SIGINT', onExit) @@ -92,6 +106,7 @@ function buildDimensions(labels: Record) { } function enqueue( + namespace: string, MetricName: string, Value: number, Unit: StandardUnit, @@ -99,11 +114,14 @@ function enqueue( ): void { if (!ENABLED) return buffer.push({ - MetricName, - Value, - Unit, - Timestamp: new Date(), - Dimensions: buildDimensions(labels), + namespace, + datum: { + MetricName, + Value, + Unit, + Timestamp: new Date(), + Dimensions: buildDimensions(labels), + }, }) if (buffer.length > MAX_BUFFER) { // Flushing has stalled (CloudWatch slow/erroring) — bound memory by dropping @@ -113,42 +131,51 @@ function enqueue( dropped += overflow } ensureBackground() - if (buffer.length >= FLUSH_THRESHOLD) void flushHostedKeyMetrics() + if (buffer.length >= FLUSH_THRESHOLD) void flushMetrics() } /** Drain the buffer to CloudWatch. Safe to call repeatedly; await before exit. */ -export async function flushHostedKeyMetrics(): Promise { +export async function flushMetrics(): Promise { if (dropped > 0) { - logger.warn('Dropped hosted-key metric points (buffer cap reached)', { dropped }) + logger.warn('Dropped metric points (buffer cap reached)', { dropped }) dropped = 0 } if (!ENABLED || buffer.length === 0) return const pending = buffer buffer = [] - for (let i = 0; i < pending.length; i += MAX_BATCH) { - const MetricData = pending.slice(i, i + MAX_BATCH) - try { - await getClient().send(new PutMetricDataCommand({ Namespace: NAMESPACE, MetricData })) - } catch (err) { - // Telemetry must never break the request path — log and drop the batch. - logger.warn('PutMetricData failed; dropping batch', { - count: MetricData.length, - error: err instanceof Error ? err.message : String(err), - }) + const byNamespace = new Map() + for (const { namespace, datum } of pending) { + const data = byNamespace.get(namespace) + if (data) data.push(datum) + else byNamespace.set(namespace, [datum]) + } + for (const [Namespace, data] of byNamespace) { + for (let i = 0; i < data.length; i += MAX_BATCH) { + const MetricData = data.slice(i, i + MAX_BATCH) + try { + await getClient().send(new PutMetricDataCommand({ Namespace, MetricData })) + } catch (err) { + // Telemetry must never break the request path — log and drop the batch. + logger.warn('PutMetricData failed; dropping batch', { + namespace: Namespace, + count: MetricData.length, + error: err instanceof Error ? err.message : String(err), + }) + } } } } export const hostedKeyMetrics = { recordUsed(labels: { provider: string; tool: string; key: string }) { - enqueue('Used', 1, StandardUnit.Count, { + enqueue(HOSTED_KEY_NAMESPACE, 'Used', 1, StandardUnit.Count, { Provider: labels.provider, Tool: labels.tool, Key: labels.key, }) }, recordFailed(labels: { provider: string; tool: string; key: string; reason: FailureReason }) { - enqueue('Failed', 1, StandardUnit.Count, { + enqueue(HOSTED_KEY_NAMESPACE, 'Failed', 1, StandardUnit.Count, { Provider: labels.provider, Tool: labels.tool, Key: labels.key, @@ -158,37 +185,97 @@ export const hostedKeyMetrics = { recordCostCharged(costUsd: number, labels: { provider: string; tool: string }) { // Unit None: CloudWatch has no USD unit; value is dollars. if (costUsd > 0) - enqueue('CostCharged', costUsd, StandardUnit.None, { + enqueue(HOSTED_KEY_NAMESPACE, 'CostCharged', costUsd, StandardUnit.None, { Provider: labels.provider, Tool: labels.tool, }) }, recordThrottled(labels: { provider: string; tool: string; reason: ThrottleReason }) { - enqueue('Throttled', 1, StandardUnit.Count, { + enqueue(HOSTED_KEY_NAMESPACE, 'Throttled', 1, StandardUnit.Count, { Provider: labels.provider, Tool: labels.tool, Reason: labels.reason, }) }, recordUpstreamRateLimited(labels: { tool: string; key: string }) { - enqueue('UpstreamRateLimited', 1, StandardUnit.Count, { + enqueue(HOSTED_KEY_NAMESPACE, 'UpstreamRateLimited', 1, StandardUnit.Count, { Tool: labels.tool, Key: labels.key, }) }, recordQueueWait(durationMs: number, labels: { provider: string; reason: QueueReason }) { - enqueue('QueueWaitDuration', durationMs, StandardUnit.Milliseconds, { + enqueue(HOSTED_KEY_NAMESPACE, 'QueueWaitDuration', durationMs, StandardUnit.Milliseconds, { Provider: labels.provider, Reason: labels.reason, }) }, recordQueueWaitExceeded(labels: { provider: string; reason: QueueReason }) { - enqueue('QueueWaitExceeded', 1, StandardUnit.Count, { + enqueue(HOSTED_KEY_NAMESPACE, 'QueueWaitExceeded', 1, StandardUnit.Count, { Provider: labels.provider, Reason: labels.reason, }) }, recordUnknownModelCost(labels: { tool: string }) { - enqueue('UnknownModelCost', 1, StandardUnit.Count, { Tool: labels.tool }) + enqueue(HOSTED_KEY_NAMESPACE, 'UnknownModelCost', 1, StandardUnit.Count, { + Tool: labels.tool, + }) + }, +} + +export const workflowMetrics = { + recordExecutionStarted(labels: { trigger: string }) { + enqueue(WORKFLOW_NAMESPACE, 'ExecutionStarted', 1, StandardUnit.Count, { + Trigger: labels.trigger, + }) + }, + /** + * One terminal outcome per execution. Error rate = failed / (success + failed) + * via CloudWatch metric math over the Status dimension. + */ + recordExecutionCompleted(labels: { + trigger: string + status: WorkflowExecutionStatus + durationMs?: number + }) { + enqueue(WORKFLOW_NAMESPACE, 'ExecutionCompleted', 1, StandardUnit.Count, { + Trigger: labels.trigger, + Status: labels.status, + }) + if (typeof labels.durationMs === 'number') { + enqueue( + WORKFLOW_NAMESPACE, + 'ExecutionDuration', + labels.durationMs, + StandardUnit.Milliseconds, + { + Trigger: labels.trigger, + Status: labels.status, + } + ) + } + }, + /** + * Pause is not terminal — the execution resumes and reaches ExecutionCompleted + * later — so it's tracked separately to keep started-vs-completed math honest. + */ + recordExecutionPaused(labels: { trigger: string }) { + enqueue(WORKFLOW_NAMESPACE, 'ExecutionPaused', 1, StandardUnit.Count, { + Trigger: labels.trigger, + }) + }, + recordBlockExecuted(labels: { + blockType: string + operation?: string + success: boolean + durationMs: number + }) { + enqueue(WORKFLOW_NAMESPACE, 'BlockExecuted', 1, StandardUnit.Count, { + BlockType: labels.blockType, + Operation: labels.operation, + Success: String(labels.success), + }) + enqueue(WORKFLOW_NAMESPACE, 'BlockDuration', labels.durationMs, StandardUnit.Milliseconds, { + BlockType: labels.blockType, + }) }, } From 912349e5fdc9dca663bc6a8950316307b9c7867b Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 9 Jun 2026 16:26:16 -0700 Subject: [PATCH 2/4] fix(metrics): skip zero-ms fallback duration and gate stale-cleanup metric on actual row transition --- .../cron/cleanup-stale-executions/route.ts | 20 +++++++++++++++++-- .../sim/lib/logs/execution/logging-session.ts | 2 +- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/apps/sim/app/api/cron/cleanup-stale-executions/route.ts b/apps/sim/app/api/cron/cleanup-stale-executions/route.ts index a2767ba097d..9f171f3caed 100644 --- a/apps/sim/app/api/cron/cleanup-stale-executions/route.ts +++ b/apps/sim/app/api/cron/cleanup-stale-executions/route.ts @@ -55,7 +55,10 @@ export const GET = withRouteHandler(async (request: NextRequest) => { const staleDurationMinutes = Math.round(staleDurationMs / 60000) const totalDurationMs = Math.min(staleDurationMs, MAX_INT32) - await db + // Conditional on status='running' so a worker that completes between the + // select and this update keeps its own terminal status (and its own + // ExecutionCompleted point) — no force-fail overwrite, no double count. + const transitioned = await db .update(workflowExecutionLogs) .set({ status: 'failed', @@ -67,7 +70,20 @@ export const GET = withRouteHandler(async (request: NextRequest) => { to_jsonb(${`Execution terminated: worker timeout or crash after ${staleDurationMinutes} minutes`}::text) )`, }) - .where(eq(workflowExecutionLogs.id, execution.id)) + .where( + and( + eq(workflowExecutionLogs.id, execution.id), + eq(workflowExecutionLogs.status, 'running') + ) + ) + .returning({ id: workflowExecutionLogs.id }) + + if (transitioned.length === 0) { + logger.info(`Skipped stale execution ${execution.executionId}: no longer running`, { + workflowId: execution.workflowId, + }) + continue + } logger.info(`Cleaned up stale execution ${execution.executionId}`, { workflowId: execution.workflowId, diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index 009fdd2d4e9..a5d2bb5a752 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -1077,7 +1077,7 @@ export class LoggingSession { } else { this.emitExecutionCompletedMetric( params.status === 'failed' || params.status === 'cancelled' ? params.status : 'success', - params.totalDurationMs || 0 + params.totalDurationMs ) } From 8673e5cbf67404903fa5b00f4320dd98220f38b3 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 9 Jun 2026 16:34:29 -0700 Subject: [PATCH 3/4] improvement(metrics): one-shot guard on ExecutionStarted across start/fallback paths --- apps/sim/lib/logs/execution/logging-session.ts | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index a5d2bb5a752..9f38cb19c95 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -138,6 +138,8 @@ export class LoggingSession { private completionAttemptFailed = false private pendingProgressWrites = new Set>() private postExecutionPromise: Promise | null = null + /** Guards against double-counting ExecutionStarted across start/fallback paths */ + private startMetricEmitted = false /** Guards against double-counting ExecutionCompleted across completion paths */ private completionMetricEmitted = false @@ -221,6 +223,12 @@ export class LoggingSession { } } + private emitExecutionStartedMetric(): void { + if (this.startMetricEmitted) return + this.startMetricEmitted = true + workflowMetrics.recordExecutionStarted({ trigger: this.triggerType }) + } + private emitExecutionCompletedMetric(status: WorkflowExecutionStatus, durationMs?: number): void { if (this.completionMetricEmitted) return this.completionMetricEmitted = true @@ -334,7 +342,7 @@ export class LoggingSession { workflowState: this.workflowState, deploymentVersionId, }) - workflowMetrics.recordExecutionStarted({ trigger: this.triggerType }) + this.emitExecutionStartedMetric() } else { // Resume: no cost reload needed. Billing reconciles from the usage_log // ledger (pre-pause rows already exist) plus the live cost summary. @@ -793,7 +801,7 @@ export class LoggingSession { workflowState: this.workflowState, deploymentVersionId, }) - workflowMetrics.recordExecutionStarted({ trigger: this.triggerType }) + this.emitExecutionStartedMetric() if (this.requestId) { logger.debug( From 83bfcbd37ec168bd0d3c8c49339b85e086a5fd04 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 9 Jun 2026 16:45:05 -0700 Subject: [PATCH 4/4] fix(metrics): count static markExecutionAsFailed terminal failures and omit unknown durations --- .../logs/execution/logging-session.test.ts | 21 ++++++++++- .../sim/lib/logs/execution/logging-session.ts | 36 +++++++++++++++++-- 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/apps/sim/lib/logs/execution/logging-session.test.ts b/apps/sim/lib/logs/execution/logging-session.test.ts index be9d15c5702..14f9894086a 100644 --- a/apps/sim/lib/logs/execution/logging-session.test.ts +++ b/apps/sim/lib/logs/execution/logging-session.test.ts @@ -627,6 +627,7 @@ describe('completeWithError cancelled-status guard', () => { describe('LoggingSession.markExecutionAsFailed workflowId scoping', () => { beforeEach(() => { vi.clearAllMocks() + dbMocks.selectLimit.mockResolvedValue([{ status: 'running', trigger: 'api' }]) dbMocks.updateWhere.mockResolvedValue(undefined) }) @@ -674,7 +675,7 @@ describe('LoggingSession workflow metrics', () => { loops: {}, parallels: {}, }) - dbMocks.selectLimit.mockResolvedValue([{ status: 'running' }]) + dbMocks.selectLimit.mockResolvedValue([{ status: 'running', trigger: 'api' }]) dbMocks.execute.mockResolvedValue(undefined) }) @@ -745,6 +746,8 @@ describe('LoggingSession workflow metrics', () => { it('does not double-emit when markAsFailed runs after a completed session', async () => { const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1') await session.complete({ totalDurationMs: 500 }) + + dbMocks.selectLimit.mockResolvedValue([{ status: 'completed', trigger: 'api' }]) await session.markAsFailed('timeout') expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1) @@ -769,6 +772,22 @@ describe('LoggingSession workflow metrics', () => { ) }) + it('static markExecutionAsFailed emits failed only for non-terminal rows', async () => { + dbMocks.selectLimit.mockResolvedValue([{ status: 'running', trigger: 'webhook' }]) + await LoggingSession.markExecutionAsFailed('exec-1', 'crash', undefined, 'wf-1') + + expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1) + expect(recordExecutionCompletedMock).toHaveBeenCalledWith({ + trigger: 'webhook', + status: 'failed', + }) + + recordExecutionCompletedMock.mockClear() + dbMocks.selectLimit.mockResolvedValue([{ status: 'failed', trigger: 'webhook' }]) + await LoggingSession.markExecutionAsFailed('exec-1', 'crash again', undefined, 'wf-1') + expect(recordExecutionCompletedMock).not.toHaveBeenCalled() + }) + it('skips the completion metric when the run was already cancelled elsewhere', async () => { dbMocks.selectLimit.mockResolvedValue([{ status: 'cancelled' }]) const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1') diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index 9f38cb19c95..08fe8f5156e 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -519,7 +519,10 @@ export class LoggingSession { }) this.completed = true - this.emitExecutionCompletedMetric('failed', Math.max(1, durationMs)) + this.emitExecutionCompletedMetric( + 'failed', + typeof totalDurationMs === 'number' ? Math.max(1, totalDurationMs) : undefined + ) try { const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import( @@ -613,7 +616,10 @@ export class LoggingSession { }) this.completed = true - this.emitExecutionCompletedMetric('cancelled', Math.max(1, durationMs)) + this.emitExecutionCompletedMetric( + 'cancelled', + typeof totalDurationMs === 'number' ? Math.max(1, totalDurationMs) : undefined + ) try { const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import( @@ -992,7 +998,10 @@ export class LoggingSession { this.requestId, this.workflowId ) - this.emitExecutionCompletedMetric('failed') + // The static helper emits the failure metric when the row transitions from + // a non-terminal status; mark this session as emitted either way so a later + // in-process completion attempt can't add a second point. + this.completionMetricEmitted = true } static async markExecutionAsFailed( @@ -1003,6 +1012,21 @@ export class LoggingSession { ): Promise { try { const message = errorMessage || 'Run failed' + const current = await db + .select({ + status: workflowExecutionLogs.status, + trigger: workflowExecutionLogs.trigger, + }) + .from(workflowExecutionLogs) + .where( + and( + eq(workflowExecutionLogs.executionId, executionId), + eq(workflowExecutionLogs.workflowId, workflowId) + ) + ) + .limit(1) + .then((rows) => rows[0]) + await db .update(workflowExecutionLogs) .set({ @@ -1029,6 +1053,12 @@ export class LoggingSession { ) ) + // Only a transition from a non-terminal status is a new terminal outcome; + // rows already completed/failed/cancelled emitted their point elsewhere. + if (current && (current.status === 'running' || current.status === 'pending')) { + workflowMetrics.recordExecutionCompleted({ trigger: current.trigger, status: 'failed' }) + } + logger.info(`[${requestId || 'unknown'}] Marked execution ${executionId} as failed`) } catch (error) { logger.error(`Failed to mark execution ${executionId} as failed:`, {