From 8edaff60b06a1fbeed12210f73477d99339c59a3 Mon Sep 17 00:00:00 2001 From: Paulius Krutkis Date: Tue, 9 Jun 2026 15:24:06 +0300 Subject: [PATCH] A6.3: Batch output sinks (ndjson stdout + per-item files) Add the two batch output destinations on top of the executor: - createNdjsonStdoutSink: one JSON record per line on stdout, regardless of TTY (the batch default). - createDirectorySink: one .json file per item into a directory (created if needed); names derived from the input URL slug or row index via batchItemFilename, deduped on collision. Both share toBatchRecord so a record looks identical whether streamed or written to a file. Successes carry `result`, failures carry `error`. Depends on A6.2 (SCR-3180). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/batch/services/batch-item-filename.ts | 26 ++++++++++ src/batch/services/batch-record.ts | 20 ++++++++ src/batch/services/directory-sink.ts | 53 ++++++++++++++++++++ src/batch/services/ndjson-stdout-sink.ts | 14 ++++++ src/batch/types/batch-sink.ts | 7 +++ tests/batch/batch-item-filename.test.ts | 19 +++++++ tests/batch/directory-sink.test.ts | 61 +++++++++++++++++++++++ tests/batch/ndjson-stdout-sink.test.ts | 38 ++++++++++++++ 8 files changed, 238 insertions(+) create mode 100644 src/batch/services/batch-item-filename.ts create mode 100644 src/batch/services/batch-record.ts create mode 100644 src/batch/services/directory-sink.ts create mode 100644 src/batch/services/ndjson-stdout-sink.ts create mode 100644 src/batch/types/batch-sink.ts create mode 100644 tests/batch/batch-item-filename.test.ts create mode 100644 tests/batch/directory-sink.test.ts create mode 100644 tests/batch/ndjson-stdout-sink.test.ts diff --git a/src/batch/services/batch-item-filename.ts b/src/batch/services/batch-item-filename.ts new file mode 100644 index 0000000..51294a6 --- /dev/null +++ b/src/batch/services/batch-item-filename.ts @@ -0,0 +1,26 @@ +const MAX_SLUG_LENGTH = 100; + +function slugify(value: string): string { + return value + .replace(/[^a-zA-Z0-9.-]+/g, "-") + .replace(/^-+|-+$/g, "") + .slice(0, MAX_SLUG_LENGTH); +} + +/** + * Derive a filesystem-safe base name (no extension) for a batch item. URLs are + * slugged from host + path; anything else falls back to the row index. + */ +export function batchItemFilename(input: string, index: number): string { + try { + const url = new URL(input); + const slug = slugify(`${url.hostname}${url.pathname}`); + if (slug.length > 0) { + return slug; + } + } catch { + // Not a URL — fall back to the row index below. + } + + return `item-${index}`; +} diff --git a/src/batch/services/batch-record.ts b/src/batch/services/batch-record.ts new file mode 100644 index 0000000..e52cb02 --- /dev/null +++ b/src/batch/services/batch-record.ts @@ -0,0 +1,20 @@ +import type { BatchResult } from "../types/batch-result.js"; + +export interface BatchRecord { + error?: { class: string; message: string }; + index: number; + input: string; + result?: unknown; +} + +/** + * Shape a settled batch result into the record that is streamed to stdout or + * written per item. Successes carry `result`; failures carry `error`. + */ +export function toBatchRecord(result: BatchResult): BatchRecord { + if (result.ok) { + return { index: result.index, input: result.input, result: result.data }; + } + + return { index: result.index, input: result.input, error: result.error }; +} diff --git a/src/batch/services/directory-sink.ts b/src/batch/services/directory-sink.ts new file mode 100644 index 0000000..b0ee8c8 --- /dev/null +++ b/src/batch/services/directory-sink.ts @@ -0,0 +1,53 @@ +import { mkdirSync, writeFileSync } from "node:fs"; +import { join } from "node:path"; +import type { BatchResult } from "../types/batch-result.js"; +import type { BatchSink } from "../types/batch-sink.js"; +import { batchItemFilename } from "./batch-item-filename.js"; +import { toBatchRecord } from "./batch-record.js"; + +export interface DirectorySinkOptions { + pretty?: boolean; +} + +function uniqueName(base: string, used: Set): string { + if (!used.has(base)) { + used.add(base); + return base; + } + + let suffix = 2; + let candidate = `${base}-${suffix}`; + while (used.has(candidate)) { + suffix++; + candidate = `${base}-${suffix}`; + } + used.add(candidate); + return candidate; +} + +/** + * Per-item sink: writes one `.json` file per result into `dir` (created + * if needed). File names come from the input URL slug or row index, deduped on + * collision. Each file holds the same record that the stdout sink would emit. + */ +export function createDirectorySink( + dir: string, + options: DirectorySinkOptions = {} +): BatchSink { + mkdirSync(dir, { recursive: true }); + const used = new Set(); + const indent = options.pretty ? 2 : undefined; + + return { + write(result: BatchResult): void { + const base = batchItemFilename(result.input, result.index); + const name = uniqueName(base, used); + const record = toBatchRecord(result); + writeFileSync( + join(dir, `${name}.json`), + JSON.stringify(record, null, indent), + "utf8" + ); + }, + }; +} diff --git a/src/batch/services/ndjson-stdout-sink.ts b/src/batch/services/ndjson-stdout-sink.ts new file mode 100644 index 0000000..21977e8 --- /dev/null +++ b/src/batch/services/ndjson-stdout-sink.ts @@ -0,0 +1,14 @@ +import type { BatchResult } from "../types/batch-result.js"; +import type { BatchSink } from "../types/batch-sink.js"; +import { toBatchRecord } from "./batch-record.js"; + +/** + * Batch default sink: one JSON record per line on stdout, regardless of TTY. + */ +export function createNdjsonStdoutSink(): BatchSink { + return { + write(result: BatchResult): void { + process.stdout.write(`${JSON.stringify(toBatchRecord(result))}\n`); + }, + }; +} diff --git a/src/batch/types/batch-sink.ts b/src/batch/types/batch-sink.ts new file mode 100644 index 0000000..b573318 --- /dev/null +++ b/src/batch/types/batch-sink.ts @@ -0,0 +1,7 @@ +import type { BatchResult } from "./batch-result.js"; + +/** Destination for batch results — stdout (ndjson) or a directory of files. */ +export interface BatchSink { + close?(): void | Promise; + write(result: BatchResult): void | Promise; +} diff --git a/tests/batch/batch-item-filename.test.ts b/tests/batch/batch-item-filename.test.ts new file mode 100644 index 0000000..8ab86ca --- /dev/null +++ b/tests/batch/batch-item-filename.test.ts @@ -0,0 +1,19 @@ +import { describe, expect, it } from "vitest"; +import { batchItemFilename } from "../../src/batch/services/batch-item-filename.js"; + +describe("batchItemFilename", () => { + it("slugs a URL from host and path, dropping the query", () => { + expect(batchItemFilename("https://example.com/a/b?x=1", 0)).toBe( + "example.com-a-b" + ); + }); + + it("falls back to the row index for non-URL input", () => { + expect(batchItemFilename("how to scrape", 3)).toBe("item-3"); + }); + + it("truncates very long slugs", () => { + const long = `https://example.com/${"a".repeat(300)}`; + expect(batchItemFilename(long, 0).length).toBeLessThanOrEqual(100); + }); +}); diff --git a/tests/batch/directory-sink.test.ts b/tests/batch/directory-sink.test.ts new file mode 100644 index 0000000..35e1776 --- /dev/null +++ b/tests/batch/directory-sink.test.ts @@ -0,0 +1,61 @@ +import { mkdtempSync, readdirSync, readFileSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { createDirectorySink } from "../../src/batch/services/directory-sink.js"; + +let dir: string; + +beforeEach(() => { + dir = mkdtempSync(join(tmpdir(), "batch-dir-sink-")); +}); + +afterEach(() => { + rmSync(dir, { recursive: true, force: true }); +}); + +describe("createDirectorySink", () => { + it("writes one record file per item, named from the input", () => { + const outDir = join(dir, "out"); + const sink = createDirectorySink(outDir); + + sink.write({ index: 0, input: "https://a.com/x", ok: true, data: "A" }); + sink.write({ + index: 1, + input: "https://b.com", + ok: false, + error: { class: "TimeoutError", message: "timed out" }, + }); + + const files = readdirSync(outDir).sort(); + expect(files).toEqual(["a.com-x.json", "b.com.json"]); + + const success = JSON.parse( + readFileSync(join(outDir, "a.com-x.json"), "utf8") + ); + expect(success).toEqual({ + index: 0, + input: "https://a.com/x", + result: "A", + }); + + const failure = JSON.parse( + readFileSync(join(outDir, "b.com.json"), "utf8") + ); + expect(failure).toEqual({ + index: 1, + input: "https://b.com", + error: { class: "TimeoutError", message: "timed out" }, + }); + }); + + it("dedupes colliding names with a numeric suffix", () => { + const sink = createDirectorySink(dir); + + sink.write({ index: 0, input: "not a url", ok: true, data: 1 }); + sink.write({ index: 0, input: "also not a url", ok: true, data: 2 }); + + const files = readdirSync(dir).sort(); + expect(files).toEqual(["item-0-2.json", "item-0.json"]); + }); +}); diff --git a/tests/batch/ndjson-stdout-sink.test.ts b/tests/batch/ndjson-stdout-sink.test.ts new file mode 100644 index 0000000..ef3a909 --- /dev/null +++ b/tests/batch/ndjson-stdout-sink.test.ts @@ -0,0 +1,38 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createNdjsonStdoutSink } from "../../src/batch/services/ndjson-stdout-sink.js"; + +afterEach(() => { + vi.restoreAllMocks(); +}); + +describe("createNdjsonStdoutSink", () => { + it("writes one JSON line per result to stdout", () => { + const lines: string[] = []; + vi.spyOn(process.stdout, "write").mockImplementation((chunk: unknown) => { + lines.push(String(chunk)); + return true; + }); + + const sink = createNdjsonStdoutSink(); + sink.write({ index: 0, input: "https://a.com", ok: true, data: { ok: 1 } }); + sink.write({ + index: 1, + input: "https://b.com", + ok: false, + error: { class: "ValidationError", message: "nope" }, + }); + + expect(lines).toHaveLength(2); + expect(lines[0].endsWith("\n")).toBe(true); + expect(JSON.parse(lines[0])).toEqual({ + index: 0, + input: "https://a.com", + result: { ok: 1 }, + }); + expect(JSON.parse(lines[1])).toEqual({ + index: 1, + input: "https://b.com", + error: { class: "ValidationError", message: "nope" }, + }); + }); +});