feat: support reading from stdin in datafusion-cli#22839
Conversation
Allow `CREATE EXTERNAL TABLE ... LOCATION '/dev/stdin'` so data can be piped into the CLI, e.g. `cat data.csv | datafusion-cli`. stdin is exposed as a `stdin://` object store, dispatched alongside the other schemes in `get_object_store`. The well-known stdin pseudo-paths (`/dev/stdin`, `/dev/fd/0`, `/proc/self/fd/0`) are rewritten to a canonical `stdin://` URL so they flow through the normal listing path. Because a pipe is not seekable and reports a size of 0, the input is buffered into an in-memory object store up front. This works for CSV, JSON, and Parquet (Parquet needs random access to its footer, which a real pipe cannot provide). Closes apache#9430
Move the stdin scheme constant, location helpers, and in-memory object store construction (plus their tests) out of object_storage.rs into a dedicated object_storage/stdin.rs submodule, encapsulated as StdinUtils.
There was a problem hiding this comment.
@huan233usc
Thanks for working on this. I found one blocking issue around /dev/stdin registration and object store replacement that can cause previously created tables to silently change behavior within the same session.
| /// Buffering the whole input up front sidesteps these limitations and lets | ||
| /// the data be read like any other object, including being scanned more than | ||
| /// once. | ||
| pub(crate) async fn object_store(url: &Url) -> Result<Arc<dyn ObjectStore>> { |
There was a problem hiding this comment.
I think there's a correctness issue here. object_store re-reads process stdin every time a stdin:// store is registered.
After the first CREATE EXTERNAL TABLE ... LOCATION '/dev/stdin', the stdin contents have already been consumed and buffered. If a second stdin-backed table is created in the same session, this path reads EOF and replaces the registered stdin:// store with an empty one. As a result, the first table silently changes from containing data to appearing empty.
Repro:
CREATE EXTERNAL TABLE t ... LOCATION '/dev/stdin';
SELECT count(*) FROM t; -- 2
CREATE EXTERNAL TABLE t2 ... LOCATION '/dev/stdin';
SELECT count(*) FROM t; -- 0Could we enforce the one-shot stdin invariant when registering the store? Two approaches that seem reasonable are:
- Reject subsequent
/dev/stdinregistrations with a clear error before replacing the existing store. - Cache and reuse the first buffered store for the same
stdin://URL.
The important part is avoiding replacement of the backing object after tables have already been registered.
There was a problem hiding this comment.
Thanks for the feedback. Yes this is a real correctness bug.
I went with the "reuse the buffered store" option. stdin is read once on the first stdin:// table; subsequent stdin:// tables in the same session reuse the already-registered store instead of re-reading (now-EOF) stdin and overwriting it.
I added an unit test to cover the scenario below
$ printf 'a,b\n1,foo\n2,bar\n' | datafusion-cli -q --command "
CREATE EXTERNAL TABLE t STORED AS CSV LOCATION '/dev/stdin' OPTIONS ('format.has_header' 'true');
SELECT count() FROM t; -- 2
CREATE EXTERNAL TABLE t2 STORED AS CSV LOCATION '/dev/stdin' OPTIONS ('format.has_header' 'true');
SELECT count() FROM t; -- 2 (was 0 before the fix)
SELECT count(*) FROM t2; -- 2
"
PTAL. Thanks!
stdin is one-shot. The object store registry keys by scheme/authority, so a second CREATE EXTERNAL TABLE ... LOCATION '/dev/stdin' previously re-read (now-EOF) stdin, built an empty store, and overwrote the populated one, silently emptying the first table. get_or_create now returns the already-registered store instead of re-reading stdin.
Add an integration test that creates two /dev/stdin tables in one session and asserts both still report the buffered rows, guarding the one-shot stdin reuse fix at the binary level. Factor the pipe-spawning boilerplate into a shared run_cli_with_stdin helper.
There was a problem hiding this comment.
@huan233usc
Thanks for addressing the original CSV/CSV repro. Reusing the existing stdin:// store and adding coverage for two CSV /dev/stdin tables in a single CLI session fixes the reported case.
That said, I think there's still a gap around the broader one-shot stdin invariant. The current implementation appears to work when subsequent registrations resolve to the same rewritten object path, but it can still break when later /dev/stdin tables use a different STORED AS format.
| state: &SessionState, | ||
| url: &Url, | ||
| ) -> Result<Arc<dyn ObjectStore>> { | ||
| if let Ok(existing) = state.runtime_env().object_store_registry.get_store(url) { |
There was a problem hiding this comment.
Nice catch reusing the existing registered store, but I think there's still a corner case here.
rewrite_location produces format-specific paths such as stdin:///stdin.csv, stdin:///stdin.json, and stdin:///stdin.parquet. The object-store registry lookup is keyed by scheme/authority rather than path, so after the first table registers stdin:///stdin.csv, a later /dev/stdin table rewritten to stdin:///stdin.json will hit this branch and reuse the existing store.
The problem is that the store only contains /stdin.csv. The second table then attempts to read /stdin.json from that store and fails (or appears missing) rather than reusing the buffered stdin bytes or producing a clear error.
So this fixes the reported CSV/CSV case, but not the broader invariant that subsequent /dev/stdin registrations should remain safe and predictable. Could we either make the stored object path canonical across formats, populate the newly requested path from the buffered stdin data, or explicitly reject a second /dev/stdin registration when the rewritten path differs and return a clear error?
There was a problem hiding this comment.
Thanks @kosiew — went with explicit rejection. The buffered object's name already records what format stdin was read as, so a second /dev/stdin table with a different STORED AS now fails with:
stdin was already read as 'stdin.csv' by an earlier statement; all tables backed by stdin in a session must declare the same STORED AS format
We could revisit sharing the bytes across formats later if there's a real use case, but it can fail silently today (piped NDJSON still "parses" as single-column CSV), so erroring felt safer.
Seperately, I also fixed a footgun I hit while testing: if stdin is already carrying the SQL (REPL, or piping a script without -c/-f), the stdin read used to eat the rest of the script or hang the shell. Now it errors and points to -c/-f. Tests added for both. PTAL!
84a0637 to
0f7a470
Compare
The object store registry keys by scheme/authority, so a second /dev/stdin table with a different STORED AS format reused the registered store but looked up a per-format object path it did not contain. The buffered object's name records the format stdin was consumed as; a later stdin-backed table declaring a different format now fails with a clear error. Sharing the bytes across formats could be silently wrong — e.g. piped NDJSON read by a second CSV table "succeeds" as a one-column table instead of surfacing the mistake. Same-format reuse still works. Also error clearly when stdin cannot carry data: when the REPL owns stdin (interactive, or SQL piped without -c/-f), buffering it for a table would consume the remaining statements as table data (silently, exit 0); when stdin is a TTY the read would block until Ctrl-D with Ctrl-C swallowed. Co-authored-by: Isaac
0f7a470 to
928c06a
Compare
Which issue does this PR close?
datafusion-cli#9430.Rationale for this change
Users frequently want to pipe data into the CLI, e.g.
cat data.csv | datafusion-cli, but pointingLOCATIONat/dev/stdindid not work:Illegal seek(a pipe is not seekable).file size of 0 is less than footer(a pipe reports size 0).This PR makes reading from standard input work for CSV, JSON, and Parquet.
What changes are included in this PR?
stdin is exposed as a
stdin://object store, dispatched alongside the other schemes (s3,gs,http, ...) inget_object_store— conceptually similar to DuckDB'sPipeFileSystem.rewrite_stdin_locationmaps the well-known stdin pseudo-paths (/dev/stdin,/dev/fd/0,/proc/self/fd/0) to a canonicalstdin:///stdin.<ext>URL, so they flow through the normal object-store/listing code path. The extension matches the declaredSTORED ASformat because the listing layer filters candidate files by extension.stdin://store reads all of standard input into an in-memory object store. Buffering up front is required because a pipe is not seekable and Parquet stores its metadata at the end of the file.Known scope/limitations (left as potential follow-ups):
CREATE EXTERNAL TABLEis supported (not dynamicSELECT * FROM '/dev/stdin')./dev/stdindoes not exist on Windows); writing to/dev/stdoutis out of scope.Are these changes tested?
Yes:
object_storage.rscoverrewrite_stdin_locationand end-to-end reads for CSV, JSON, and Parquet via the in-memory store.#[cfg(unix)]integration test incli_integration.rsdrives the real binary through an actual pipe, exercising the real stdin read.Are there any user-facing changes?
Yes — reading from stdin via
LOCATION '/dev/stdin'is now supported. Documented indocs/source/user-guide/cli/datasources.md(new "Reading from standard input" section). No breaking changes.