Skip to content

feat: support reading from stdin in datafusion-cli#22839

Open
huan233usc wants to merge 6 commits into
apache:mainfrom
huan233usc:cli-stdin-9430
Open

feat: support reading from stdin in datafusion-cli#22839
huan233usc wants to merge 6 commits into
apache:mainfrom
huan233usc:cli-stdin-9430

Conversation

@huan233usc

Copy link
Copy Markdown

Which issue does this PR close?

Rationale for this change

Users frequently want to pipe data into the CLI, e.g. cat data.csv | datafusion-cli, but pointing LOCATION at /dev/stdin did not work:

  • CSV failed with Illegal seek (a pipe is not seekable).
  • Parquet failed with file size of 0 is less than footer (a pipe reports size 0).
  • JSON silently returned 0 rows.

This PR makes reading from standard input work for CSV, JSON, and Parquet.

What changes are included in this PR?

stdin is exposed as a stdin:// object store, dispatched alongside the other schemes (s3, gs, http, ...) in get_object_store — conceptually similar to DuckDB's PipeFileSystem.

  • rewrite_stdin_location maps the well-known stdin pseudo-paths (/dev/stdin, /dev/fd/0, /proc/self/fd/0) to a canonical stdin:///stdin.<ext> URL, so they flow through the normal object-store/listing code path. The extension matches the declared STORED AS format because the listing layer filters candidate files by extension.
  • The stdin:// store reads all of standard input into an in-memory object store. Buffering up front is required because a pipe is not seekable and Parquet stores its metadata at the end of the file.

Known scope/limitations (left as potential follow-ups):

  • Only CREATE EXTERNAL TABLE is supported (not dynamic SELECT * FROM '/dev/stdin').
  • Input is fully buffered in memory, so it must fit in memory.
  • stdin can only be consumed once per session.
  • Unix-only (/dev/stdin does not exist on Windows); writing to /dev/stdout is out of scope.

Are these changes tested?

Yes:

  • Unit tests in object_storage.rs cover rewrite_stdin_location and end-to-end reads for CSV, JSON, and Parquet via the in-memory store.
  • A #[cfg(unix)] integration test in cli_integration.rs drives the real binary through an actual pipe, exercising the real stdin read.
  • Manually verified all three formats via real pipes, and confirmed normal local-file reads are unaffected.

Are there any user-facing changes?

Yes — reading from stdin via LOCATION '/dev/stdin' is now supported. Documented in docs/source/user-guide/cli/datasources.md (new "Reading from standard input" section). No breaking changes.

Allow `CREATE EXTERNAL TABLE ... LOCATION '/dev/stdin'` so data can be
piped into the CLI, e.g. `cat data.csv | datafusion-cli`.

stdin is exposed as a `stdin://` object store, dispatched alongside the
other schemes in `get_object_store`. The well-known stdin pseudo-paths
(`/dev/stdin`, `/dev/fd/0`, `/proc/self/fd/0`) are rewritten to a
canonical `stdin://` URL so they flow through the normal listing path.

Because a pipe is not seekable and reports a size of 0, the input is
buffered into an in-memory object store up front. This works for CSV,
JSON, and Parquet (Parquet needs random access to its footer, which a
real pipe cannot provide).

Closes apache#9430
@github-actions github-actions Bot added the documentation Improvements or additions to documentation label Jun 9, 2026
Move the stdin scheme constant, location helpers, and in-memory object
store construction (plus their tests) out of object_storage.rs into a
dedicated object_storage/stdin.rs submodule, encapsulated as StdinUtils.

@kosiew kosiew left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huan233usc
Thanks for working on this. I found one blocking issue around /dev/stdin registration and object store replacement that can cause previously created tables to silently change behavior within the same session.

/// Buffering the whole input up front sidesteps these limitations and lets
/// the data be read like any other object, including being scanned more than
/// once.
pub(crate) async fn object_store(url: &Url) -> Result<Arc<dyn ObjectStore>> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a correctness issue here. object_store re-reads process stdin every time a stdin:// store is registered.

After the first CREATE EXTERNAL TABLE ... LOCATION '/dev/stdin', the stdin contents have already been consumed and buffered. If a second stdin-backed table is created in the same session, this path reads EOF and replaces the registered stdin:// store with an empty one. As a result, the first table silently changes from containing data to appearing empty.

Repro:

CREATE EXTERNAL TABLE t ... LOCATION '/dev/stdin';
SELECT count(*) FROM t; -- 2

CREATE EXTERNAL TABLE t2 ... LOCATION '/dev/stdin';
SELECT count(*) FROM t; -- 0

Could we enforce the one-shot stdin invariant when registering the store? Two approaches that seem reasonable are:

  • Reject subsequent /dev/stdin registrations with a clear error before replacing the existing store.
  • Cache and reuse the first buffered store for the same stdin:// URL.

The important part is avoiding replacement of the backing object after tables have already been registered.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. Yes this is a real correctness bug.

I went with the "reuse the buffered store" option. stdin is read once on the first stdin:// table; subsequent stdin:// tables in the same session reuse the already-registered store instead of re-reading (now-EOF) stdin and overwriting it.

I added an unit test to cover the scenario below

$ printf 'a,b\n1,foo\n2,bar\n' | datafusion-cli -q --command "
CREATE EXTERNAL TABLE t STORED AS CSV LOCATION '/dev/stdin' OPTIONS ('format.has_header' 'true');
SELECT count() FROM t; -- 2
CREATE EXTERNAL TABLE t2 STORED AS CSV LOCATION '/dev/stdin' OPTIONS ('format.has_header' 'true');
SELECT count(
) FROM t; -- 2 (was 0 before the fix)
SELECT count(*) FROM t2; -- 2
"
PTAL. Thanks!

stdin is one-shot. The object store registry keys by scheme/authority, so
a second CREATE EXTERNAL TABLE ... LOCATION '/dev/stdin' previously
re-read (now-EOF) stdin, built an empty store, and overwrote the
populated one, silently emptying the first table. get_or_create now
returns the already-registered store instead of re-reading stdin.
Add an integration test that creates two /dev/stdin tables in one session
and asserts both still report the buffered rows, guarding the one-shot
stdin reuse fix at the binary level. Factor the pipe-spawning boilerplate
into a shared run_cli_with_stdin helper.
@huan233usc huan233usc requested a review from kosiew June 9, 2026 17:33

@kosiew kosiew left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huan233usc
Thanks for addressing the original CSV/CSV repro. Reusing the existing stdin:// store and adding coverage for two CSV /dev/stdin tables in a single CLI session fixes the reported case.

That said, I think there's still a gap around the broader one-shot stdin invariant. The current implementation appears to work when subsequent registrations resolve to the same rewritten object path, but it can still break when later /dev/stdin tables use a different STORED AS format.

state: &SessionState,
url: &Url,
) -> Result<Arc<dyn ObjectStore>> {
if let Ok(existing) = state.runtime_env().object_store_registry.get_store(url) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch reusing the existing registered store, but I think there's still a corner case here.

rewrite_location produces format-specific paths such as stdin:///stdin.csv, stdin:///stdin.json, and stdin:///stdin.parquet. The object-store registry lookup is keyed by scheme/authority rather than path, so after the first table registers stdin:///stdin.csv, a later /dev/stdin table rewritten to stdin:///stdin.json will hit this branch and reuse the existing store.

The problem is that the store only contains /stdin.csv. The second table then attempts to read /stdin.json from that store and fails (or appears missing) rather than reusing the buffered stdin bytes or producing a clear error.

So this fixes the reported CSV/CSV case, but not the broader invariant that subsequent /dev/stdin registrations should remain safe and predictable. Could we either make the stored object path canonical across formats, populate the newly requested path from the buffered stdin data, or explicitly reject a second /dev/stdin registration when the rewritten path differs and return a clear error?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kosiew — went with explicit rejection. The buffered object's name already records what format stdin was read as, so a second /dev/stdin table with a different STORED AS now fails with:

stdin was already read as 'stdin.csv' by an earlier statement; all tables backed by stdin in a session must declare the same STORED AS format

We could revisit sharing the bytes across formats later if there's a real use case, but it can fail silently today (piped NDJSON still "parses" as single-column CSV), so erroring felt safer.

Seperately, I also fixed a footgun I hit while testing: if stdin is already carrying the SQL (REPL, or piping a script without -c/-f), the stdin read used to eat the rest of the script or hang the shell. Now it errors and points to -c/-f. Tests added for both. PTAL!

@huan233usc huan233usc force-pushed the cli-stdin-9430 branch 4 times, most recently from 84a0637 to 0f7a470 Compare June 10, 2026 07:43
The object store registry keys by scheme/authority, so a second
/dev/stdin table with a different STORED AS format reused the
registered store but looked up a per-format object path it did not
contain. The buffered object's name records the format stdin was
consumed as; a later stdin-backed table declaring a different format
now fails with a clear error. Sharing the bytes across formats could
be silently wrong — e.g. piped NDJSON read by a second CSV table
"succeeds" as a one-column table instead of surfacing the mistake.
Same-format reuse still works.

Also error clearly when stdin cannot carry data: when the REPL owns
stdin (interactive, or SQL piped without -c/-f), buffering it for a
table would consume the remaining statements as table data (silently,
exit 0); when stdin is a TTY the read would block until Ctrl-D with
Ctrl-C swallowed.

Co-authored-by: Isaac
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

support for reading data from stdin for datafusion-cli

2 participants