diff --git a/src/batch/services/run-batch.ts b/src/batch/services/run-batch.ts new file mode 100644 index 0000000..85ab8c5 --- /dev/null +++ b/src/batch/services/run-batch.ts @@ -0,0 +1,76 @@ +import type { BatchItem } from "../types/batch-item.js"; +import type { BatchResult, BatchSummary } from "../types/batch-result.js"; +import { toErrorRecord } from "./to-error-record.js"; + +export interface RunBatchOptions { + /** Maximum number of workers running concurrently (clamped to >= 1). */ + concurrency: number; + /** Lazily produced inputs; pulled one at a time as workers free up. */ + items: AsyncIterable; + /** Invoked once per item, in completion order. Never called concurrently. */ + onResult: (result: BatchResult) => void | Promise; + /** Runs a single item; its resolved value becomes the success payload. */ + worker: (item: BatchItem) => Promise; +} + +const noop = (): void => { + // Intentionally empty: used to swallow lock-chain rejections. +}; + +/** + * Run a batch of inputs through `worker` with bounded concurrency. A failure in + * one item is captured as an error record and does not abort the batch; results + * are emitted via `onResult` as each item settles. Returns a run summary. + */ +export async function runBatch({ + items, + concurrency, + worker, + onResult, +}: RunBatchOptions): Promise { + const summary: BatchSummary = { total: 0, succeeded: 0, failed: 0 }; + const iterator = items[Symbol.asyncIterator](); + + // Serialise pulls from the iterator: async iterators must not have + // overlapping next() calls. + let pullLock: Promise = Promise.resolve(); + function nextItem(): Promise> { + const pending = pullLock.then(() => iterator.next()); + pullLock = pending.then(noop, noop); + return pending; + } + + // Serialise emissions so onResult is never re-entered concurrently. + let emitLock: Promise = Promise.resolve(); + function emit(result: BatchResult): Promise { + emitLock = emitLock.then(() => onResult(result)); + return emitLock; + } + + async function drain(): Promise { + while (true) { + const next = await nextItem(); + if (next.done) { + return; + } + + const item = next.value; + summary.total++; + + try { + const data = await worker(item); + summary.succeeded++; + await emit({ ...item, ok: true, data }); + } catch (err) { + summary.failed++; + await emit({ ...item, ok: false, error: toErrorRecord(err) }); + } + } + } + + const workerCount = Math.max(1, Math.floor(concurrency)); + await Promise.all(Array.from({ length: workerCount }, () => drain())); + await emitLock; + + return summary; +} diff --git a/src/batch/services/to-error-record.ts b/src/batch/services/to-error-record.ts new file mode 100644 index 0000000..a4e870d --- /dev/null +++ b/src/batch/services/to-error-record.ts @@ -0,0 +1,13 @@ +import type { BatchErrorRecord } from "../types/batch-result.js"; + +/** Normalise any thrown value into a `{ class, message }` error record. */ +export function toErrorRecord(err: unknown): BatchErrorRecord { + if (err instanceof Error) { + return { + class: err.constructor.name || err.name || "Error", + message: err.message, + }; + } + + return { class: "Error", message: String(err) }; +} diff --git a/src/batch/types/batch-result.ts b/src/batch/types/batch-result.ts new file mode 100644 index 0000000..55418ac --- /dev/null +++ b/src/batch/types/batch-result.ts @@ -0,0 +1,25 @@ +import type { BatchItem } from "./batch-item.js"; + +export interface BatchErrorRecord { + /** Error class name, e.g. "ValidationError". */ + class: string; + message: string; +} + +export interface BatchSuccess extends BatchItem { + data: unknown; + ok: true; +} + +export interface BatchFailure extends BatchItem { + error: BatchErrorRecord; + ok: false; +} + +export type BatchResult = BatchSuccess | BatchFailure; + +export interface BatchSummary { + failed: number; + succeeded: number; + total: number; +} diff --git a/tests/batch/run-batch.test.ts b/tests/batch/run-batch.test.ts new file mode 100644 index 0000000..c905449 --- /dev/null +++ b/tests/batch/run-batch.test.ts @@ -0,0 +1,86 @@ +import { ValidationError } from "@decodo/sdk-ts"; +import { describe, expect, it } from "vitest"; +import { runBatch } from "../../src/batch/services/run-batch.js"; +import type { BatchItem } from "../../src/batch/types/batch-item.js"; +import type { BatchResult } from "../../src/batch/types/batch-result.js"; + +async function* toAsync(inputs: string[]): AsyncGenerator { + let index = 0; + for (const input of inputs) { + await Promise.resolve(); + yield { index, input }; + index++; + } +} + +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +describe("runBatch", () => { + it("runs every item and emits success records", async () => { + const results: BatchResult[] = []; + + const summary = await runBatch({ + items: toAsync(["a", "b", "c"]), + concurrency: 2, + worker: (item) => Promise.resolve(item.input.toUpperCase()), + onResult: (result) => { + results.push(result); + }, + }); + + expect(summary).toEqual({ total: 3, succeeded: 3, failed: 0 }); + expect(results.map((r) => r.index).sort()).toEqual([0, 1, 2]); + const a = results.find((r) => r.input === "a"); + expect(a).toMatchObject({ ok: true, data: "A" }); + }); + + it("captures per-item errors without aborting the batch", async () => { + const results: BatchResult[] = []; + + const summary = await runBatch({ + items: toAsync(["ok-1", "boom", "ok-2"]), + concurrency: 3, + worker: (item) => { + if (item.input === "boom") { + throw new ValidationError("bad input"); + } + return Promise.resolve(item.input); + }, + onResult: (result) => { + results.push(result); + }, + }); + + expect(summary).toEqual({ total: 3, succeeded: 2, failed: 1 }); + const failure = results.find((r) => !r.ok); + expect(failure).toMatchObject({ + ok: false, + input: "boom", + error: { class: "ValidationError", message: "bad input" }, + }); + }); + + it("never exceeds the configured concurrency", async () => { + let active = 0; + let peak = 0; + + await runBatch({ + items: toAsync(["1", "2", "3", "4", "5", "6"]), + concurrency: 2, + worker: async () => { + active++; + peak = Math.max(peak, active); + await delay(5); + active--; + }, + onResult: () => { + // no-op + }, + }); + + expect(peak).toBeLessThanOrEqual(2); + expect(peak).toBeGreaterThan(0); + }); +});