Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/batch/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export const DEFAULT_CONCURRENCY = 4;

export const CSV_EXTENSION = ".csv";
41 changes: 41 additions & 0 deletions src/batch/services/parse-csv-line.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Split a single CSV line into fields. Handles double-quoted fields and
* escaped quotes (`""`). Embedded newlines inside quoted fields are not
* supported — inputs are parsed one physical line at a time so the file can be
* read streamingly without buffering the whole document.
*/
export function parseCsvLine(line: string): string[] {
const fields: string[] = [];
let current = "";
let inQuotes = false;

for (let i = 0; i < line.length; i++) {
const char = line[i];

if (inQuotes) {
if (char === '"') {
if (line[i + 1] === '"') {
current += '"';
i++;
} else {
inQuotes = false;
}
} else {
current += char;
}
continue;
}

if (char === '"') {
inQuotes = true;
} else if (char === ",") {
fields.push(current);
current = "";
} else {
current += char;
}
}

fields.push(current);
return fields;
}
98 changes: 98 additions & 0 deletions src/batch/services/read-input-file.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { createReadStream } from "node:fs";
import { extname } from "node:path";
import { createInterface } from "node:readline";
import { ValidationError } from "@decodo/sdk-ts";
import { CliUsageError } from "../../platform/services/handle-cli-error.js";
import { CSV_EXTENSION } from "../constants.js";
import type { BatchItem } from "../types/batch-item.js";
import { parseCsvLine } from "./parse-csv-line.js";

export interface ReadInputFileOptions {
/** Required column name when the input file is a CSV. */
inputColumn?: string;
}

function isCsv(path: string): boolean {
return extname(path).toLowerCase() === CSV_EXTENSION;
}

function readLines(path: string): AsyncIterable<string> {
const stream = createReadStream(path, { encoding: "utf8" });
return createInterface({
input: stream,
crlfDelay: Number.POSITIVE_INFINITY,
});
}

async function* readTxt(path: string): AsyncGenerator<BatchItem> {
let index = 0;
for await (const line of readLines(path)) {
const input = line.trim();
if (input.length === 0) {
continue;
}
yield { index, input };
index++;
}
}

function resolveColumnIndex(header: string[], column: string): number {
const columnIndex = header.findIndex((name) => name.trim() === column);
if (columnIndex === -1) {
throw new ValidationError(
`Column "${column}" not found in CSV header: ${header.join(", ")}`
);
}
return columnIndex;
}

async function* readCsv(
path: string,
column: string
): AsyncGenerator<BatchItem> {
let columnIndex: number | undefined;
let index = 0;

for await (const line of readLines(path)) {
if (line.trim().length === 0) {
continue;
}

const fields = parseCsvLine(line);

if (columnIndex === undefined) {
columnIndex = resolveColumnIndex(fields, column);
continue;
}

const input = fields[columnIndex]?.trim() ?? "";
if (input.length === 0) {
continue;
}

yield { index, input };
index++;
}
}

/**
* Stream inputs from a batch input file. `.csv` files require `inputColumn`;
* any other extension is treated as plain text with one input per line. Blank
* lines are skipped. The file is read lazily so multi-MB inputs are never fully
* buffered in memory.
*/
export function readInputFile(
path: string,
options: ReadInputFileOptions = {}
): AsyncGenerator<BatchItem> {
if (isCsv(path)) {
if (!options.inputColumn) {
throw new CliUsageError(
"--input-column is required when --input-file is a CSV."
);
}
return readCsv(path, options.inputColumn);
}

return readTxt(path);
}
6 changes: 6 additions & 0 deletions src/batch/types/batch-item.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface BatchItem {
/** Zero-based index of the item among the emitted inputs. */
index: number;
/** The resolved input (URL or query) for this item. */
input: string;
}
23 changes: 23 additions & 0 deletions tests/batch/parse-csv-line.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { describe, expect, it } from "vitest";
import { parseCsvLine } from "../../src/batch/services/parse-csv-line.js";

describe("parseCsvLine", () => {
it("splits a plain comma-separated line", () => {
expect(parseCsvLine("a,b,c")).toEqual(["a", "b", "c"]);
});

it("preserves commas inside quoted fields", () => {
expect(parseCsvLine('"https://x.com/?a=1,2",b')).toEqual([
"https://x.com/?a=1,2",
"b",
]);
});

it("unescapes doubled quotes inside quoted fields", () => {
expect(parseCsvLine('"say ""hi""",next')).toEqual(['say "hi"', "next"]);
});

it("keeps empty trailing and leading fields", () => {
expect(parseCsvLine(",a,")).toEqual(["", "a", ""]);
});
});
81 changes: 81 additions & 0 deletions tests/batch/read-input-file.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { mkdtempSync, rmSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { ValidationError } from "@decodo/sdk-ts";
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import { readInputFile } from "../../src/batch/services/read-input-file.js";
import type { BatchItem } from "../../src/batch/types/batch-item.js";
import { CliUsageError } from "../../src/platform/services/handle-cli-error.js";

let dir: string;

function writeFixture(name: string, content: string): string {
const path = join(dir, name);
writeFileSync(path, content, "utf8");
return path;
}

async function collect(
iterable: AsyncIterable<BatchItem>
): Promise<BatchItem[]> {
const items: BatchItem[] = [];
for await (const item of iterable) {
items.push(item);
}
return items;
}

beforeAll(() => {
dir = mkdtempSync(join(tmpdir(), "batch-input-"));
});

afterAll(() => {
rmSync(dir, { recursive: true, force: true });
});

describe("readInputFile (.txt)", () => {
it("yields one trimmed input per non-empty line", async () => {
const path = writeFixture(
"urls.txt",
"https://a.com\n https://b.com \n\n\nhttps://c.com\n"
);

const items = await collect(readInputFile(path));

expect(items).toEqual([
{ index: 0, input: "https://a.com" },
{ index: 1, input: "https://b.com" },
{ index: 2, input: "https://c.com" },
]);
});
});

describe("readInputFile (.csv)", () => {
it("extracts the named column and skips the header", async () => {
const path = writeFixture(
"input.csv",
"id,url,note\n1,https://a.com,first\n2,https://b.com,second\n"
);

const items = await collect(readInputFile(path, { inputColumn: "url" }));

expect(items).toEqual([
{ index: 0, input: "https://a.com" },
{ index: 1, input: "https://b.com" },
]);
});

it("requires --input-column for CSV input", () => {
const path = writeFixture("missing-column.csv", "url\nhttps://a.com\n");

expect(() => readInputFile(path)).toThrow(CliUsageError);
});

it("throws when the named column is absent from the header", async () => {
const path = writeFixture("bad-header.csv", "id,note\n1,first\n");

await expect(
collect(readInputFile(path, { inputColumn: "url" }))
).rejects.toThrow(ValidationError);
});
});
Loading