From ac2b82816571d635a11a318c07f68fef737dc4c4 Mon Sep 17 00:00:00 2001 From: Paulius Krutkis Date: Tue, 9 Jun 2026 15:20:30 +0300 Subject: [PATCH] A6.1: Stream batch inputs from --input-file (.txt + CSV) Add the batch input-parsing layer: lazily read inputs from a file, one per item, without buffering the whole file in memory. - .txt: one trimmed input per line, blank lines skipped. - .csv: requires --input-column; quote-aware line parser handles commas and escaped quotes inside fields. - Usage/validation errors for missing column flag and unknown column. Unit tests only; no SDK calls (SCR-3178). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/batch/constants.ts | 3 + src/batch/services/parse-csv-line.ts | 41 +++++++++++ src/batch/services/read-input-file.ts | 98 +++++++++++++++++++++++++++ src/batch/types/batch-item.ts | 6 ++ tests/batch/parse-csv-line.test.ts | 23 +++++++ tests/batch/read-input-file.test.ts | 81 ++++++++++++++++++++++ 6 files changed, 252 insertions(+) create mode 100644 src/batch/constants.ts create mode 100644 src/batch/services/parse-csv-line.ts create mode 100644 src/batch/services/read-input-file.ts create mode 100644 src/batch/types/batch-item.ts create mode 100644 tests/batch/parse-csv-line.test.ts create mode 100644 tests/batch/read-input-file.test.ts diff --git a/src/batch/constants.ts b/src/batch/constants.ts new file mode 100644 index 0000000..9ab1014 --- /dev/null +++ b/src/batch/constants.ts @@ -0,0 +1,3 @@ +export const DEFAULT_CONCURRENCY = 4; + +export const CSV_EXTENSION = ".csv"; diff --git a/src/batch/services/parse-csv-line.ts b/src/batch/services/parse-csv-line.ts new file mode 100644 index 0000000..854be70 --- /dev/null +++ b/src/batch/services/parse-csv-line.ts @@ -0,0 +1,41 @@ +/** + * Split a single CSV line into fields. Handles double-quoted fields and + * escaped quotes (`""`). Embedded newlines inside quoted fields are not + * supported — inputs are parsed one physical line at a time so the file can be + * read streamingly without buffering the whole document. + */ +export function parseCsvLine(line: string): string[] { + const fields: string[] = []; + let current = ""; + let inQuotes = false; + + for (let i = 0; i < line.length; i++) { + const char = line[i]; + + if (inQuotes) { + if (char === '"') { + if (line[i + 1] === '"') { + current += '"'; + i++; + } else { + inQuotes = false; + } + } else { + current += char; + } + continue; + } + + if (char === '"') { + inQuotes = true; + } else if (char === ",") { + fields.push(current); + current = ""; + } else { + current += char; + } + } + + fields.push(current); + return fields; +} diff --git a/src/batch/services/read-input-file.ts b/src/batch/services/read-input-file.ts new file mode 100644 index 0000000..6428fb5 --- /dev/null +++ b/src/batch/services/read-input-file.ts @@ -0,0 +1,98 @@ +import { createReadStream } from "node:fs"; +import { extname } from "node:path"; +import { createInterface } from "node:readline"; +import { ValidationError } from "@decodo/sdk-ts"; +import { CliUsageError } from "../../platform/services/handle-cli-error.js"; +import { CSV_EXTENSION } from "../constants.js"; +import type { BatchItem } from "../types/batch-item.js"; +import { parseCsvLine } from "./parse-csv-line.js"; + +export interface ReadInputFileOptions { + /** Required column name when the input file is a CSV. */ + inputColumn?: string; +} + +function isCsv(path: string): boolean { + return extname(path).toLowerCase() === CSV_EXTENSION; +} + +function readLines(path: string): AsyncIterable { + const stream = createReadStream(path, { encoding: "utf8" }); + return createInterface({ + input: stream, + crlfDelay: Number.POSITIVE_INFINITY, + }); +} + +async function* readTxt(path: string): AsyncGenerator { + let index = 0; + for await (const line of readLines(path)) { + const input = line.trim(); + if (input.length === 0) { + continue; + } + yield { index, input }; + index++; + } +} + +function resolveColumnIndex(header: string[], column: string): number { + const columnIndex = header.findIndex((name) => name.trim() === column); + if (columnIndex === -1) { + throw new ValidationError( + `Column "${column}" not found in CSV header: ${header.join(", ")}` + ); + } + return columnIndex; +} + +async function* readCsv( + path: string, + column: string +): AsyncGenerator { + let columnIndex: number | undefined; + let index = 0; + + for await (const line of readLines(path)) { + if (line.trim().length === 0) { + continue; + } + + const fields = parseCsvLine(line); + + if (columnIndex === undefined) { + columnIndex = resolveColumnIndex(fields, column); + continue; + } + + const input = fields[columnIndex]?.trim() ?? ""; + if (input.length === 0) { + continue; + } + + yield { index, input }; + index++; + } +} + +/** + * Stream inputs from a batch input file. `.csv` files require `inputColumn`; + * any other extension is treated as plain text with one input per line. Blank + * lines are skipped. The file is read lazily so multi-MB inputs are never fully + * buffered in memory. + */ +export function readInputFile( + path: string, + options: ReadInputFileOptions = {} +): AsyncGenerator { + if (isCsv(path)) { + if (!options.inputColumn) { + throw new CliUsageError( + "--input-column is required when --input-file is a CSV." + ); + } + return readCsv(path, options.inputColumn); + } + + return readTxt(path); +} diff --git a/src/batch/types/batch-item.ts b/src/batch/types/batch-item.ts new file mode 100644 index 0000000..4b115d2 --- /dev/null +++ b/src/batch/types/batch-item.ts @@ -0,0 +1,6 @@ +export interface BatchItem { + /** Zero-based index of the item among the emitted inputs. */ + index: number; + /** The resolved input (URL or query) for this item. */ + input: string; +} diff --git a/tests/batch/parse-csv-line.test.ts b/tests/batch/parse-csv-line.test.ts new file mode 100644 index 0000000..0f5fcc1 --- /dev/null +++ b/tests/batch/parse-csv-line.test.ts @@ -0,0 +1,23 @@ +import { describe, expect, it } from "vitest"; +import { parseCsvLine } from "../../src/batch/services/parse-csv-line.js"; + +describe("parseCsvLine", () => { + it("splits a plain comma-separated line", () => { + expect(parseCsvLine("a,b,c")).toEqual(["a", "b", "c"]); + }); + + it("preserves commas inside quoted fields", () => { + expect(parseCsvLine('"https://x.com/?a=1,2",b')).toEqual([ + "https://x.com/?a=1,2", + "b", + ]); + }); + + it("unescapes doubled quotes inside quoted fields", () => { + expect(parseCsvLine('"say ""hi""",next')).toEqual(['say "hi"', "next"]); + }); + + it("keeps empty trailing and leading fields", () => { + expect(parseCsvLine(",a,")).toEqual(["", "a", ""]); + }); +}); diff --git a/tests/batch/read-input-file.test.ts b/tests/batch/read-input-file.test.ts new file mode 100644 index 0000000..872042f --- /dev/null +++ b/tests/batch/read-input-file.test.ts @@ -0,0 +1,81 @@ +import { mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { ValidationError } from "@decodo/sdk-ts"; +import { afterAll, beforeAll, describe, expect, it } from "vitest"; +import { readInputFile } from "../../src/batch/services/read-input-file.js"; +import type { BatchItem } from "../../src/batch/types/batch-item.js"; +import { CliUsageError } from "../../src/platform/services/handle-cli-error.js"; + +let dir: string; + +function writeFixture(name: string, content: string): string { + const path = join(dir, name); + writeFileSync(path, content, "utf8"); + return path; +} + +async function collect( + iterable: AsyncIterable +): Promise { + const items: BatchItem[] = []; + for await (const item of iterable) { + items.push(item); + } + return items; +} + +beforeAll(() => { + dir = mkdtempSync(join(tmpdir(), "batch-input-")); +}); + +afterAll(() => { + rmSync(dir, { recursive: true, force: true }); +}); + +describe("readInputFile (.txt)", () => { + it("yields one trimmed input per non-empty line", async () => { + const path = writeFixture( + "urls.txt", + "https://a.com\n https://b.com \n\n\nhttps://c.com\n" + ); + + const items = await collect(readInputFile(path)); + + expect(items).toEqual([ + { index: 0, input: "https://a.com" }, + { index: 1, input: "https://b.com" }, + { index: 2, input: "https://c.com" }, + ]); + }); +}); + +describe("readInputFile (.csv)", () => { + it("extracts the named column and skips the header", async () => { + const path = writeFixture( + "input.csv", + "id,url,note\n1,https://a.com,first\n2,https://b.com,second\n" + ); + + const items = await collect(readInputFile(path, { inputColumn: "url" })); + + expect(items).toEqual([ + { index: 0, input: "https://a.com" }, + { index: 1, input: "https://b.com" }, + ]); + }); + + it("requires --input-column for CSV input", () => { + const path = writeFixture("missing-column.csv", "url\nhttps://a.com\n"); + + expect(() => readInputFile(path)).toThrow(CliUsageError); + }); + + it("throws when the named column is absent from the header", async () => { + const path = writeFixture("bad-header.csv", "id,note\n1,first\n"); + + await expect( + collect(readInputFile(path, { inputColumn: "url" })) + ).rejects.toThrow(ValidationError); + }); +});