Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/batch/commands/attach-batch-options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type { Command } from "commander";
import { parseConcurrency } from "../services/resolve-concurrency.js";

/**
* Attach the batch flags to a scrape-style command. In batch mode the existing
* `-o, --output` flag is interpreted as an output directory.
*/
export function attachBatchOptions(command: Command): Command {
return command
.option(
"--input-file <path>",
"Run each line/row of a .txt or .csv file as a batch item"
)
.option(
"--input-column <name>",
"Column to read inputs from when --input-file is a CSV"
)
.option(
"--concurrency <n>",
"Max requests to run in parallel in batch mode (default: 4)",
parseConcurrency
);
}
44 changes: 44 additions & 0 deletions src/batch/services/binary-directory-sink.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { mkdirSync, writeFileSync } from "node:fs";
import { join } from "node:path";
import type { BatchResult } from "../types/batch-result.js";
import type { BatchSink } from "../types/batch-sink.js";
import { batchItemFilename } from "./batch-item-filename.js";
import { toBatchRecord } from "./batch-record.js";
import { uniqueName } from "./unique-name.js";

function isBytes(value: unknown): value is Uint8Array {
return value instanceof Uint8Array;
}

/**
* Per-item sink for binary (e.g. screenshot) batches: writes `<name>.<ext>` for
* each successful item and `<name>.error.json` for failures. Used when the
* target produces binary output, which cannot be streamed as ndjson.
*/
export function createBinaryDirectorySink(
dir: string,
extension = "png"
): BatchSink {
mkdirSync(dir, { recursive: true });
const used = new Set<string>();

return {
write(result: BatchResult): void {
const name = uniqueName(
batchItemFilename(result.input, result.index),
used
);

if (result.ok && isBytes(result.data)) {
writeFileSync(join(dir, `${name}.${extension}`), result.data);
return;
}

writeFileSync(
join(dir, `${name}.error.json`),
JSON.stringify(toBatchRecord(result), null, 2),
"utf8"
);
},
};
}
17 changes: 1 addition & 16 deletions src/batch/services/directory-sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,12 @@ import type { BatchResult } from "../types/batch-result.js";
import type { BatchSink } from "../types/batch-sink.js";
import { batchItemFilename } from "./batch-item-filename.js";
import { toBatchRecord } from "./batch-record.js";
import { uniqueName } from "./unique-name.js";

export interface DirectorySinkOptions {
pretty?: boolean;
}

function uniqueName(base: string, used: Set<string>): string {
if (!used.has(base)) {
used.add(base);
return base;
}

let suffix = 2;
let candidate = `${base}-${suffix}`;
while (used.has(candidate)) {
suffix++;
candidate = `${base}-${suffix}`;
}
used.add(candidate);
return candidate;
}

/**
* Per-item sink: writes one `<name>.json` file per result into `dir` (created
* if needed). File names come from the input URL slug or row index, deduped on
Expand Down
15 changes: 15 additions & 0 deletions src/batch/services/resolve-concurrency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { ValidationError } from "@decodo/sdk-ts";
import { DEFAULT_CONCURRENCY } from "../constants.js";

/** Commander option parser for `--concurrency`. */
export function parseConcurrency(value: string): number {
const parsed = Number.parseInt(value, 10);
if (Number.isNaN(parsed) || parsed < 1) {
throw new ValidationError("--concurrency must be a positive integer.");
}
return parsed;
}

export function resolveConcurrency(value: number | undefined): number {
return value ?? DEFAULT_CONCURRENCY;
}
73 changes: 73 additions & 0 deletions src/batch/services/run-batch-command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { ValidationError } from "@decodo/sdk-ts";
import { CliUsageError } from "../../platform/services/handle-cli-error.js";
import type { BatchSummary } from "../types/batch-result.js";
import type { BatchSink } from "../types/batch-sink.js";
import { createBinaryDirectorySink } from "./binary-directory-sink.js";
import { createDirectorySink } from "./directory-sink.js";
import { createNdjsonStdoutSink } from "./ndjson-stdout-sink.js";
import { readInputFile } from "./read-input-file.js";
import { runBatch } from "./run-batch.js";

export interface RunBatchCommandOptions {
/** Target produces binary output (e.g. screenshots). Requires `output`. */
binary?: boolean;
concurrency: number;
inputColumn?: string;
inputFile: string;
/** Output directory; when set, one file per item is written here. */
output?: string;
pretty?: boolean;
/** Runs a single input; resolves to the payload (or bytes) to emit. */
scrapeItem: (input: string) => Promise<unknown>;
}

function selectSink(options: RunBatchCommandOptions): BatchSink {
if (options.binary) {
if (!options.output) {
throw new CliUsageError(
"Batch mode for binary output requires -o <dir> to write files."
);
}
return createBinaryDirectorySink(options.output);
}

if (options.output) {
return createDirectorySink(options.output, { pretty: options.pretty });
}

return createNdjsonStdoutSink();
}

/**
* Drive a batch run end to end: stream inputs from the file, execute each via
* `scrapeItem` with bounded concurrency, and emit results to the appropriate
* sink (ndjson stdout by default, or one file per item when `output` is set).
* Per-item failures are recorded, not fatal; a summary is printed to stderr.
*/
export async function runBatchCommand(
options: RunBatchCommandOptions
): Promise<BatchSummary> {
const sink = selectSink(options);
const items = readInputFile(options.inputFile, {
inputColumn: options.inputColumn,
});

const summary = await runBatch({
items,
concurrency: options.concurrency,
worker: (item) => options.scrapeItem(item.input),
onResult: (result) => sink.write(result),
});

await sink.close?.();

if (summary.total === 0) {
throw new ValidationError("Input file produced no inputs.");
}

console.error(
`Batch complete: ${summary.succeeded} succeeded, ${summary.failed} failed (${summary.total} total).`
);

return summary;
}
19 changes: 19 additions & 0 deletions src/batch/services/unique-name.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Return `base` if unused, otherwise append the smallest `-N` suffix that is
* free. Records the chosen name in `used`.
*/
export function uniqueName(base: string, used: Set<string>): string {
if (!used.has(base)) {
used.add(base);
return base;
}

let suffix = 2;
while (used.has(`${base}-${suffix}`)) {
suffix++;
}

const candidate = `${base}-${suffix}`;
used.add(candidate);
return candidate;
}
6 changes: 6 additions & 0 deletions src/batch/types/batch-flags.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/** Batch flags attached to scrape-style commands. */
export interface BatchFlags {
concurrency?: number;
inputColumn?: string;
inputFile?: string;
}
7 changes: 5 additions & 2 deletions src/scrape/commands/scrape.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { type DecodoSchema, Target, ValidationError } from "@decodo/sdk-ts";
import { Command } from "commander";
import { attachBatchOptions } from "../../batch/commands/attach-batch-options.js";
import { attachScrapeOutputOptions } from "../../output/commands/attach-output-options.js";
import { applyRequestDefaults } from "../../output/services/apply-request-defaults.js";
import type { OutputOptions } from "../../output/types/output-options.js";
import { CliUsageError } from "../../platform/services/handle-cli-error.js";
import { resolveTarget } from "../services/resolve-target.js";
import { createTargetAction } from "../services/run-target-scrape.js";
import type { ScrapeOptions } from "../types/scrape-command.js";
Expand Down Expand Up @@ -32,17 +34,18 @@ export function createScrapeCommand(schema: DecodoSchema): Command {
.description(
"Scrape a URL with the universal target (markdown by default). Use decodo universal for --markdown, --parse, and other API flags."
)
.argument("<url>", "URL to scrape")
.argument("[url]", "URL to scrape (omit when using --input-file)")
.option("--country <code>", "Geo / country code (maps to geo)")
.option("--headers <json>", "Request headers as a JSON object string")
.option("--target <name>", "Scrape target override (default: universal)");

attachScrapeOutputOptions(command);
attachBatchOptions(command);

return command.action(
createTargetAction(Target.Universal, schema, (url, options) => {
if (url === undefined) {
throw new Error("Missing required URL.");
throw new CliUsageError("Missing required URL.");
}

const opts = options as ScrapeOptions & OutputOptions;
Expand Down
7 changes: 5 additions & 2 deletions src/scrape/commands/screenshot.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { type DecodoSchema, Target } from "@decodo/sdk-ts";
import { Command } from "commander";
import { attachBatchOptions } from "../../batch/commands/attach-batch-options.js";
import { attachScrapeOutputOptions } from "../../output/commands/attach-output-options.js";
import { CliUsageError } from "../../platform/services/handle-cli-error.js";
import { resolveTarget } from "../services/resolve-target.js";
import { createTargetAction } from "../services/run-target-scrape.js";
import type { ScreenshotOptions } from "../types/screenshot-command.js";
Expand All @@ -10,21 +12,22 @@ export function createScreenshotCommand(schema: DecodoSchema): Command {
.description(
"Capture a PNG screenshot (universal, headless). Use decodo universal --headless png for full options."
)
.argument("<url>", "URL to screenshot")
.argument("[url]", "URL to screenshot (omit when using --input-file)")
.option("--country <code>", "Geo / country code (maps to geo)")
.option("--target <name>", "Scrape target override (default: universal)");

attachScrapeOutputOptions(command, {
outputHelp: "Write PNG to file or directory (default name: <host>.png)",
});
attachBatchOptions(command);

return command.action(
createTargetAction(
Target.Universal,
schema,
(url, options) => {
if (url === undefined) {
throw new Error("Missing required URL.");
throw new CliUsageError("Missing required URL.");
}

const opts = options as ScreenshotOptions;
Expand Down
7 changes: 5 additions & 2 deletions src/scrape/commands/search.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { type DecodoSchema, Target, ValidationError } from "@decodo/sdk-ts";
import { Command, Option } from "commander";
import { attachBatchOptions } from "../../batch/commands/attach-batch-options.js";
import { attachScrapeOutputOptions } from "../../output/commands/attach-output-options.js";
import { applyRequestDefaults } from "../../output/services/apply-request-defaults.js";
import { CliUsageError } from "../../platform/services/handle-cli-error.js";
import { resolveTarget } from "../services/resolve-target.js";
import { createTargetAction } from "../services/run-target-scrape.js";
import type { SearchOptions } from "../types/search-command.js";
Expand Down Expand Up @@ -52,7 +54,7 @@ export function createSearchCommand(schema: DecodoSchema): Command {
.description(
"Search the web (default: Google). Use decodo google-search or decodo bing-search for full options."
)
.argument("<query>", "Search query")
.argument("[query]", "Search query (omit when using --input-file)")
.addOption(
new Option("--engine <engine>", "Search engine")
.choices(["google", "bing"])
Expand All @@ -63,11 +65,12 @@ export function createSearchCommand(schema: DecodoSchema): Command {
.option("--target <name>", "Scrape target override");

attachScrapeOutputOptions(command);
attachBatchOptions(command);

return command.action(
createTargetAction(Target.GoogleSearch, schema, (query, options) => {
if (query === undefined) {
throw new Error("Missing required query.");
throw new CliUsageError("Missing required query.");
}

const opts = options as SearchOptions;
Expand Down
9 changes: 7 additions & 2 deletions src/scrape/services/command-builder.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import type { DecodoSchema } from "@decodo/sdk-ts";
import { type Command, Option } from "commander";
import type { JSONSchema4 } from "json-schema";
import { attachBatchOptions } from "../../batch/commands/attach-batch-options.js";
import { attachScrapeOutputOptions } from "../../output/commands/attach-output-options.js";
import { applyRequestDefaults } from "../../output/services/apply-request-defaults.js";
import { CliUsageError } from "../../platform/services/handle-cli-error.js";
import type { TargetCommandConfig } from "../types/target-command.js";
import { snakeToCamel, snakeToKebab } from "./naming.js";
import { getPrimaryInputField } from "./primary-input.js";
Expand Down Expand Up @@ -81,7 +83,7 @@ export function configureTargetCommand(
| undefined;
const inputHelp =
primarySchema?.description ?? `Primary ${primaryField} input`;
command.argument("<input>", inputHelp);
command.argument("[input]", inputHelp);
}

const optionFields = Object.keys(parameterSchema?.properties ?? {}).filter(
Expand All @@ -94,6 +96,7 @@ export function configureTargetCommand(
}

attachScrapeOutputOptions(command);
attachBatchOptions(command);

return { target, primaryField, optionFields };
}
Expand All @@ -109,7 +112,9 @@ export function buildScrapeBody(

if (config.primaryField) {
if (input === undefined) {
throw new Error(`Missing required input for ${config.primaryField}.`);
throw new CliUsageError(
`Missing required input for ${config.primaryField}.`
);
}
body[config.primaryField] = input;
}
Expand Down
Loading
Loading