Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions src/batch/services/run-batch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import type { BatchItem } from "../types/batch-item.js";
import type { BatchResult, BatchSummary } from "../types/batch-result.js";
import { toErrorRecord } from "./to-error-record.js";

export interface RunBatchOptions {
/** Maximum number of workers running concurrently (clamped to >= 1). */
concurrency: number;
/** Lazily produced inputs; pulled one at a time as workers free up. */
items: AsyncIterable<BatchItem>;
/** Invoked once per item, in completion order. Never called concurrently. */
onResult: (result: BatchResult) => void | Promise<void>;
/** Runs a single item; its resolved value becomes the success payload. */
worker: (item: BatchItem) => Promise<unknown>;
}

const noop = (): void => {
// Intentionally empty: used to swallow lock-chain rejections.
};

/**
* Run a batch of inputs through `worker` with bounded concurrency. A failure in
* one item is captured as an error record and does not abort the batch; results
* are emitted via `onResult` as each item settles. Returns a run summary.
*/
export async function runBatch({
items,
concurrency,
worker,
onResult,
}: RunBatchOptions): Promise<BatchSummary> {
const summary: BatchSummary = { total: 0, succeeded: 0, failed: 0 };
const iterator = items[Symbol.asyncIterator]();

// Serialise pulls from the iterator: async iterators must not have
// overlapping next() calls.
let pullLock: Promise<unknown> = Promise.resolve();
function nextItem(): Promise<IteratorResult<BatchItem>> {
const pending = pullLock.then(() => iterator.next());
pullLock = pending.then(noop, noop);
return pending;
}

// Serialise emissions so onResult is never re-entered concurrently.
let emitLock: Promise<unknown> = Promise.resolve();
function emit(result: BatchResult): Promise<unknown> {
emitLock = emitLock.then(() => onResult(result));
return emitLock;
}

async function drain(): Promise<void> {
while (true) {
const next = await nextItem();
if (next.done) {
return;
}

const item = next.value;
summary.total++;

try {
const data = await worker(item);
summary.succeeded++;
await emit({ ...item, ok: true, data });
} catch (err) {
summary.failed++;
await emit({ ...item, ok: false, error: toErrorRecord(err) });
}
}
}

const workerCount = Math.max(1, Math.floor(concurrency));
await Promise.all(Array.from({ length: workerCount }, () => drain()));
await emitLock;

return summary;
}
13 changes: 13 additions & 0 deletions src/batch/services/to-error-record.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type { BatchErrorRecord } from "../types/batch-result.js";

/** Normalise any thrown value into a `{ class, message }` error record. */
export function toErrorRecord(err: unknown): BatchErrorRecord {
if (err instanceof Error) {
return {
class: err.constructor.name || err.name || "Error",
message: err.message,
};
}

return { class: "Error", message: String(err) };
}
25 changes: 25 additions & 0 deletions src/batch/types/batch-result.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type { BatchItem } from "./batch-item.js";

export interface BatchErrorRecord {
/** Error class name, e.g. "ValidationError". */
class: string;
message: string;
}

export interface BatchSuccess extends BatchItem {
data: unknown;
ok: true;
}

export interface BatchFailure extends BatchItem {
error: BatchErrorRecord;
ok: false;
}

export type BatchResult = BatchSuccess | BatchFailure;

export interface BatchSummary {
failed: number;
succeeded: number;
total: number;
}
86 changes: 86 additions & 0 deletions tests/batch/run-batch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { ValidationError } from "@decodo/sdk-ts";
import { describe, expect, it } from "vitest";
import { runBatch } from "../../src/batch/services/run-batch.js";
import type { BatchItem } from "../../src/batch/types/batch-item.js";
import type { BatchResult } from "../../src/batch/types/batch-result.js";

async function* toAsync(inputs: string[]): AsyncGenerator<BatchItem> {
let index = 0;
for (const input of inputs) {
await Promise.resolve();
yield { index, input };
index++;
}
}

function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

describe("runBatch", () => {
it("runs every item and emits success records", async () => {
const results: BatchResult[] = [];

const summary = await runBatch({
items: toAsync(["a", "b", "c"]),
concurrency: 2,
worker: (item) => Promise.resolve(item.input.toUpperCase()),
onResult: (result) => {
results.push(result);
},
});

expect(summary).toEqual({ total: 3, succeeded: 3, failed: 0 });
expect(results.map((r) => r.index).sort()).toEqual([0, 1, 2]);
const a = results.find((r) => r.input === "a");
expect(a).toMatchObject({ ok: true, data: "A" });
});

it("captures per-item errors without aborting the batch", async () => {
const results: BatchResult[] = [];

const summary = await runBatch({
items: toAsync(["ok-1", "boom", "ok-2"]),
concurrency: 3,
worker: (item) => {
if (item.input === "boom") {
throw new ValidationError("bad input");
}
return Promise.resolve(item.input);
},
onResult: (result) => {
results.push(result);
},
});

expect(summary).toEqual({ total: 3, succeeded: 2, failed: 1 });
const failure = results.find((r) => !r.ok);
expect(failure).toMatchObject({
ok: false,
input: "boom",
error: { class: "ValidationError", message: "bad input" },
});
});

it("never exceeds the configured concurrency", async () => {
let active = 0;
let peak = 0;

await runBatch({
items: toAsync(["1", "2", "3", "4", "5", "6"]),
concurrency: 2,
worker: async () => {
active++;
peak = Math.max(peak, active);
await delay(5);
active--;
},
onResult: () => {
// no-op
},
});

expect(peak).toBeLessThanOrEqual(2);
expect(peak).toBeGreaterThan(0);
});
});
Loading