diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 09347d6d7dc2c..4956fe46f43de 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -23,7 +23,7 @@ use crate::print_format::PrintFormat; use crate::{ command::{Command, OutputFormat}, helper::CliHelper, - object_storage::get_object_store, + object_storage::{get_object_store, stdin::StdinUtils}, print_options::{MaxRows, PrintOptions}, }; use datafusion::common::instant::Instant; @@ -418,9 +418,14 @@ async fn create_plan( // Note that cmd is a mutable reference so that create_external_table function can remove all // datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion // will raise Configuration errors. - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { + if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { // To support custom formats, treat error as None let format = config_file_type_from_str(&cmd.file_type); + + // Expose stdin (e.g. `cat data.csv | datafusion-cli`) as a `stdin://` + // object store, registered like any other scheme in `get_object_store`. + cmd.location = StdinUtils::rewrite_location(&cmd.location, format.as_ref()); + register_object_store_and_config_extensions( ctx, &cmd.location, diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 935bf0a9744dd..e6da95af2eec9 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -34,6 +34,7 @@ use datafusion_cli::catalog::DynamicObjectStoreCatalog; use datafusion_cli::functions::{ ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc, }; +use datafusion_cli::object_storage::StdinCarriesCommands; use datafusion_cli::object_storage::instrumented::{ InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, }; @@ -158,6 +159,14 @@ struct Args { object_store_profiling: InstrumentedObjectStoreMode, } +impl Args { + /// Without -c/-f the CLI enters the REPL, which reads its SQL from + /// stdin — interactively or piped. + fn repl_mode(&self) -> bool { + self.command.is_empty() && self.file.is_empty() + } +} + #[tokio::main] /// Calls [`main_inner`], then handles printing errors and returning the correct exit code pub async fn main() -> ExitCode { @@ -268,6 +277,7 @@ async fn main_inner() -> Result<()> { instrumented_registry: Arc::clone(&instrumented_registry), }; + let repl_mode = args.repl_mode(); let commands = args.command; let files = args.file; let rc = match args.rc { @@ -285,7 +295,7 @@ async fn main_inner() -> Result<()> { } }; - if commands.is_empty() && files.is_empty() { + if repl_mode { if !rc.is_empty() { exec::exec_from_files(&ctx, rc, &print_options).await?; } @@ -330,8 +340,16 @@ fn get_session_config(args: &Args) -> Result { config_options.format.null = String::from("NULL"); } - let session_config = + let mut session_config = SessionConfig::from(config_options).with_information_schema(true); + + if args.repl_mode() { + // In the REPL stdin carries the SQL (including for any rc file run + // before it), so it cannot also serve as a data source for + // `LOCATION '/dev/stdin'`. + session_config = session_config.with_extension(Arc::new(StdinCarriesCommands)); + } + Ok(session_config) } diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 34787838929f1..c11c87a97aff7 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -16,6 +16,9 @@ // under the License. pub mod instrumented; +pub(crate) mod stdin; + +pub use stdin::StdinCarriesCommands; use async_trait::async_trait; use aws_config::BehaviorVersion; @@ -564,6 +567,9 @@ pub(crate) async fn get_object_store( .with_url(url.origin().ascii_serialization()) .build()?, ), + _ if scheme == stdin::StdinUtils::SCHEME => { + stdin::StdinUtils::get_or_create(state, url).await? + } _ => { // For other types, try to get from `object_store_registry`: state diff --git a/datafusion-cli/src/object_storage/stdin.rs b/datafusion-cli/src/object_storage/stdin.rs new file mode 100644 index 0000000000000..b83c2c64b53b4 --- /dev/null +++ b/datafusion-cli/src/object_storage/stdin.rs @@ -0,0 +1,373 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Exposes the process's standard input as a `stdin://` object store so that +//! piped data (e.g. `cat data.csv | datafusion-cli`) can be queried via +//! `CREATE EXTERNAL TABLE ... LOCATION '/dev/stdin'`. + +use std::io::{IsTerminal, Read}; +use std::sync::Arc; + +use datafusion::common::exec_datafusion_err; +use datafusion::config::ConfigFileType; +use datafusion::error::Result; +use datafusion::execution::context::SessionState; +use futures::TryStreamExt; + +use object_store::memory::InMemory; +use object_store::path::Path as ObjectStorePath; +use object_store::{ObjectStore, ObjectStoreExt}; +use url::Url; + +/// Marker [`SessionConfig`] extension recording that the session reads its SQL +/// commands from stdin (the interactive or piped REPL). stdin cannot then also +/// serve as a data source: reading it for table data would silently consume +/// the remaining SQL statements. +/// +/// [`SessionConfig`]: datafusion::execution::context::SessionConfig +#[derive(Debug)] +pub struct StdinCarriesCommands; + +/// Utilities for exposing the process's standard input as an object store. +/// +/// stdin is surfaced as a `stdin://` object store and dispatched alongside the +/// other schemes (`s3`, `gs`, `http`, ...) so that reading piped data flows +/// through the normal object-store/listing code path, conceptually similar to +/// DuckDB's `PipeFileSystem`. +pub(crate) struct StdinUtils; + +impl StdinUtils { + /// The URL scheme used to expose stdin as an object store, mirroring how + /// `s3`, `gs`, `http`, etc. are addressed. + pub(crate) const SCHEME: &'static str = "stdin"; + + /// Filesystem paths that refer to the process's standard input. + /// + /// These are intentionally limited to the well known pseudo-files exposed + /// by the operating system so that ordinary files are never accidentally + /// treated as stdin. + const LOCATIONS: [&'static str; 3] = ["/dev/stdin", "/dev/fd/0", "/proc/self/fd/0"]; + + /// Returns `true` if `location` refers to the process's standard input. + fn is_stdin_location(location: &str) -> bool { + Self::LOCATIONS.contains(&location) + } + + /// Rewrites the well known stdin pseudo-paths (e.g. `/dev/stdin`) to a + /// canonical `stdin://` URL so that reading from standard input flows + /// through the same object-store/listing code path as any other scheme. + /// Non-stdin locations are returned unchanged. + /// + /// The listing layer filters candidate files by extension, so the canonical + /// object is named with the extension matching the declared `STORED AS` + /// format. The name thereby also records which format stdin was consumed + /// as: a later stdin-backed table declaring a different format resolves to + /// a path the buffered store does not contain and is rejected by + /// [`Self::get_or_create`]. + pub(crate) fn rewrite_location( + location: &str, + format: Option<&ConfigFileType>, + ) -> String { + if !Self::is_stdin_location(location) { + return location.to_string(); + } + + let object_name = match format { + Some(ConfigFileType::CSV) => "stdin.csv", + Some(ConfigFileType::JSON) => "stdin.json", + Some(ConfigFileType::PARQUET) => "stdin.parquet", + _ => "stdin", + }; + format!("{}:///{object_name}", Self::SCHEME) + } + + /// Returns the object store backing the `stdin://` scheme, reading and + /// buffering standard input on first use and reusing that buffer for any + /// subsequent `stdin://` table created in the same session. + /// + /// stdin is a one-shot stream: it can only be read once. The object store + /// registry keys by scheme/authority, so every `stdin://` URL maps to the + /// same store. Without this guard, a second `CREATE EXTERNAL TABLE ... + /// LOCATION '/dev/stdin'` would re-read (now-EOF) stdin, build an empty + /// store, and overwrite the populated one, silently emptying the earlier + /// table. Reusing the already-registered store avoids that. + /// + /// A later stdin-backed table declaring a different `STORED AS` format + /// resolves to an object the store does not contain (the object name + /// records the format stdin was consumed as) and is rejected with a clear + /// error — both reading the buffer as another format and re-reading stdin + /// would be silently wrong. + pub(crate) async fn get_or_create( + state: &SessionState, + url: &Url, + ) -> Result> { + let Ok(existing) = state.runtime_env().object_store_registry.get_store(url) + else { + return Self::object_store(state, url).await; + }; + + let path = ObjectStorePath::from_url_path(url.path())?; + if existing.head(&path).await.is_err() { + let buffered = existing + .list(None) + .try_next() + .await + .ok() + .flatten() + .map(|meta| format!(" as '{}'", meta.location)) + .unwrap_or_default(); + return Err(exec_datafusion_err!( + "stdin was already read{buffered} by an earlier statement; all \ + tables backed by stdin in a session must declare the same \ + STORED AS format" + )); + } + Ok(existing) + } + + /// Builds the object store backing the `stdin://` scheme by reading all of + /// standard input into memory. + /// + /// A pipe (e.g. `cat data.csv | datafusion-cli`) is not seekable and reports + /// a size of `0`, so it cannot be read directly by the file based formats + /// (CSV requires seeking, Parquet needs the footer at the end of the file). + /// Buffering the whole input up front sidesteps these limitations and lets + /// the data be read like any other object, including being scanned more than + /// once. + async fn object_store( + state: &SessionState, + url: &Url, + ) -> Result> { + if state + .config() + .get_extension::() + .is_some() + { + return Err(exec_datafusion_err!( + "stdin is already being read for SQL commands, so it cannot \ + also supply table data; pass the query with -c/--command or \ + -f/--file so that stdin carries the data, e.g. \ + `cat data.csv | datafusion-cli -f query.sql`" + )); + } + if std::io::stdin().is_terminal() { + return Err(exec_datafusion_err!( + "stdin is connected to a terminal, not piped data; pipe the \ + input in, e.g. `cat data.csv | datafusion-cli -f query.sql`" + )); + } + + let mut buffer = Vec::new(); + std::io::stdin() + .lock() + .read_to_end(&mut buffer) + .map_err(|e| exec_datafusion_err!("Failed to read from stdin: {e}"))?; + Self::in_memory_object_store(url, buffer).await + } + + /// Stores `data` at the path referenced by `url` in a fresh [`InMemory`] + /// store. + async fn in_memory_object_store( + url: &Url, + data: Vec, + ) -> Result> { + let store = InMemory::new(); + store + .put(&ObjectStorePath::from_url_path(url.path())?, data.into()) + .await?; + Ok(Arc::new(store)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use datafusion::prelude::{SessionConfig, SessionContext}; + + #[test] + fn rewrites_stdin_locations() { + // stdin pseudo-paths are rewritten to a `stdin://` URL carrying the + // extension that matches the declared format. + assert_eq!( + StdinUtils::rewrite_location("/dev/stdin", Some(&ConfigFileType::CSV)), + "stdin:///stdin.csv" + ); + assert_eq!( + StdinUtils::rewrite_location("/dev/fd/0", Some(&ConfigFileType::JSON)), + "stdin:///stdin.json" + ); + assert_eq!( + StdinUtils::rewrite_location( + "/proc/self/fd/0", + Some(&ConfigFileType::PARQUET) + ), + "stdin:///stdin.parquet" + ); + assert_eq!( + StdinUtils::rewrite_location("/dev/stdin", None), + "stdin:///stdin" + ); + + // Ordinary locations are left untouched. + for location in ["/dev/stdout", "data/stdin.csv", "stdin", "s3://b/f.csv"] { + assert_eq!( + StdinUtils::rewrite_location(location, Some(&ConfigFileType::CSV)), + location + ); + } + } + + /// Buffers `data` into the `stdin://` object store and reads it back through + /// a `CREATE EXTERNAL TABLE`, returning the number of rows in the table. + /// + /// This exercises the full path used for `/dev/stdin` short of the actual + /// stdin read, which cannot be driven from a unit test. + async fn count_stdin_rows( + data: Vec, + stored_as: &str, + format: Option, + options: &str, + ) -> Result { + let location = StdinUtils::rewrite_location("/dev/stdin", format.as_ref()); + let url = Url::parse(&location).unwrap(); + let store = StdinUtils::in_memory_object_store(&url, data).await?; + + let ctx = SessionContext::new(); + ctx.register_object_store(&url, store); + ctx.sql(&format!( + "CREATE EXTERNAL TABLE t STORED AS {stored_as} LOCATION '{location}' {options}" + )) + .await? + .collect() + .await?; + + ctx.sql("SELECT * FROM t").await?.count().await + } + + #[tokio::test] + async fn reuses_buffered_stdin_store() -> Result<()> { + // stdin can only be read once, so a second `stdin://` table must reuse + // the store buffered by the first instead of re-reading (now-empty) + // stdin and overwriting it. + let url = Url::parse("stdin:///stdin.csv").unwrap(); + let store = + StdinUtils::in_memory_object_store(&url, b"a\n1\n2\n".to_vec()).await?; + + let ctx = SessionContext::new(); + ctx.register_object_store(&url, store); + + let reused = StdinUtils::get_or_create(&ctx.state(), &url).await?; + let path = ObjectStorePath::from_url_path(url.path())?; + let bytes = reused.get(&path).await?.bytes().await?; + assert_eq!(bytes.as_ref(), b"a\n1\n2\n"); + Ok(()) + } + + #[tokio::test] + async fn rejects_second_stdin_table_with_different_format() -> Result<()> { + // The buffered object's name records the format stdin was consumed + // as; a later stdin table declaring a different format must fail with + // a clear error rather than a downstream "not found" (or silently + // misreading the bytes as another format). + let csv_url = Url::parse("stdin:///stdin.csv").unwrap(); + let store = + StdinUtils::in_memory_object_store(&csv_url, b"a\n1\n".to_vec()).await?; + + let ctx = SessionContext::new(); + ctx.register_object_store(&csv_url, store); + + let json_url = Url::parse("stdin:///stdin.json").unwrap(); + let err = StdinUtils::get_or_create(&ctx.state(), &json_url) + .await + .unwrap_err() + .to_string(); + assert!( + err.contains("must declare the same STORED AS format") + && err.contains("stdin.csv"), + "unexpected error: {err}" + ); + Ok(()) + } + + #[tokio::test] + async fn errors_when_stdin_carries_commands() { + // Once the REPL owns stdin for SQL commands, building the stdin store + // must fail with a clear error instead of swallowing the remaining + // statements as table data. + let config = SessionConfig::new().with_extension(Arc::new(StdinCarriesCommands)); + let ctx = SessionContext::new_with_config(config); + + let url = Url::parse("stdin:///stdin.csv").unwrap(); + let err = StdinUtils::get_or_create(&ctx.state(), &url) + .await + .unwrap_err(); + assert!( + err.to_string().contains("SQL commands"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn stdin_object_store_reads_csv() -> Result<()> { + let data = b"a,b\n1,foo\n2,bar\n".to_vec(); + let rows = count_stdin_rows( + data, + "CSV", + Some(ConfigFileType::CSV), + "OPTIONS ('format.has_header' 'true')", + ) + .await?; + assert_eq!(rows, 2); + Ok(()) + } + + #[tokio::test] + async fn stdin_object_store_reads_json() -> Result<()> { + let data = b"{\"a\": 1, \"b\": \"foo\"}\n{\"a\": 2, \"b\": \"bar\"}\n".to_vec(); + let rows = count_stdin_rows(data, "JSON", Some(ConfigFileType::JSON), "").await?; + assert_eq!(rows, 2); + Ok(()) + } + + #[tokio::test] + async fn stdin_object_store_reads_parquet() -> Result<()> { + use datafusion::arrow::array::Int32Array; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::arrow::record_batch::RecordBatch; + use parquet::arrow::ArrowWriter; + + // Parquet requires random access to the footer, which a real pipe cannot + // provide; the in-memory buffer makes this work. + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + + let mut data = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut data, schema, None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let rows = + count_stdin_rows(data, "PARQUET", Some(ConfigFileType::PARQUET), "").await?; + assert_eq!(rows, 3); + Ok(()) + } +} diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index 4849ac9e9a5e2..f99d42d1b4200 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -173,6 +173,172 @@ fn cli_quick_test<'a>( assert_cmd_snapshot!(cmd); } +/// Read data piped into the CLI via the `/dev/stdin` pseudo-path. +/// +/// Unix-only: `/dev/stdin` does not exist on Windows. This drives the real +/// binary through an actual pipe, exercising the stdin read that the in-process +/// unit tests cannot. +#[cfg(unix)] +#[test] +fn test_cli_read_from_stdin() { + let stdout = run_cli_with_stdin( + "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION '/dev/stdin' \ + OPTIONS ('format.has_header' 'true'); \ + SELECT b, count(*) AS c FROM t GROUP BY b ORDER BY b;", + b"a,b\n1,foo\n2,bar\n3,foo\n", + ); + + assert!( + stdout.contains("| foo | 2 |") && stdout.contains("| bar | 1 |"), + "unexpected output:\n{stdout}" + ); +} + +/// stdin is a one-shot stream, so a second `/dev/stdin` table in the same +/// session must reuse the buffered input rather than re-reading (now-empty) +/// stdin and silently emptying the first table. +#[cfg(unix)] +#[test] +fn test_cli_read_from_stdin_twice_reuses_buffer() { + let stdout = run_cli_with_stdin( + "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION '/dev/stdin' \ + OPTIONS ('format.has_header' 'true'); \ + CREATE EXTERNAL TABLE t2 STORED AS CSV LOCATION '/dev/stdin' \ + OPTIONS ('format.has_header' 'true'); \ + SELECT count(*) AS t_count FROM t; \ + SELECT count(*) AS t2_count FROM t2;", + b"a,b\n1,foo\n2,bar\n", + ); + + // Both tables must still see the two buffered rows. + let counts: Vec<&str> = stdout + .lines() + .filter(|line| line.trim_start().starts_with("| 2 ")) + .collect(); + assert_eq!( + counts.len(), + 2, + "expected both stdin tables to report 2 rows, got:\n{stdout}" + ); +} + +/// A later `/dev/stdin` table declaring a different `STORED AS` format must be +/// rejected with a clear error: stdin is one-shot, its bytes were already +/// buffered under the first table's format, and silently reading them as +/// another format would be wrong. +#[cfg(unix)] +#[test] +fn test_cli_read_from_stdin_mixed_formats_rejected() { + use std::io::Write; + use std::process::Stdio; + + let mut child = cli() + .args([ + "-q", + "--command", + "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION '/dev/stdin' \ + OPTIONS ('format.has_header' 'true'); \ + CREATE EXTERNAL TABLE t2 STORED AS JSON LOCATION '/dev/stdin';", + ]) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("failed to spawn datafusion-cli"); + + child + .stdin + .take() + .unwrap() + .write_all(b"a,b\n1,foo\n2,bar\n") + .unwrap(); + + let output = child.wait_with_output().unwrap(); + // Fatal errors in `--command` mode are reported on stdout. + let stdout = String::from_utf8_lossy(&output.stdout); + + assert!( + !output.status.success(), + "expected the mismatched format to fail, stdout:\n{stdout}" + ); + assert!( + stdout.contains("must declare the same STORED AS format"), + "expected a clear mismatch error, got:\n{stdout}" + ); +} + +/// When the SQL itself arrives on stdin (the piped REPL, e.g. `cat script.sql +/// | datafusion-cli`), stdin cannot double as a data source: the statement +/// must fail with a clear error instead of silently consuming the rest of the +/// script as table data, and the remaining statements must still run. +#[cfg(unix)] +#[test] +fn test_cli_stdin_location_rejected_when_sql_comes_from_stdin() { + use std::io::Write; + use std::process::Stdio; + + let mut child = cli() + .arg("-q") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("failed to spawn datafusion-cli"); + + child + .stdin + .take() + .unwrap() + .write_all( + b"CREATE EXTERNAL TABLE t STORED AS CSV LOCATION '/dev/stdin';\n\ + SELECT 123 + 456;\n", + ) + .unwrap(); + + let output = child.wait_with_output().unwrap(); + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + + assert!( + stderr.contains("SQL commands"), + "expected a clear error about stdin carrying SQL.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); + // The statement after the failed CREATE must still execute rather than + // being consumed as table data. + assert!( + stdout.contains("579"), + "expected the following statement to still run.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); +} + +/// Spawns the real `datafusion-cli` binary, pipes `stdin` into it, and returns +/// its stdout after asserting a successful exit. +#[cfg(unix)] +fn run_cli_with_stdin(command: &str, stdin: &[u8]) -> String { + use std::io::Write; + use std::process::Stdio; + + let mut child = cli() + .args(["-q", "--command", command]) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("failed to spawn datafusion-cli"); + + child.stdin.take().unwrap().write_all(stdin).unwrap(); + + let output = child.wait_with_output().unwrap(); + let stdout = String::from_utf8_lossy(&output.stdout).into_owned(); + let stderr = String::from_utf8_lossy(&output.stderr); + + assert!( + output.status.success(), + "datafusion-cli failed.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); + stdout +} + #[test] fn cli_explain_environment_overrides() { let mut settings = make_settings(); diff --git a/docs/source/user-guide/cli/datasources.md b/docs/source/user-guide/cli/datasources.md index 6b1a4887a8a0f..59a6b0aa43284 100644 --- a/docs/source/user-guide/cli/datasources.md +++ b/docs/source/user-guide/cli/datasources.md @@ -132,6 +132,30 @@ select count(*) from hits; 1 row in set. Query took 0.344 seconds. ``` +## Reading from standard input + +On Unix-like systems you can pipe data into the CLI and query it by pointing the +`LOCATION` at the `/dev/stdin` pseudo-file: + +```console +$ cat hits.csv | datafusion-cli -c " +CREATE EXTERNAL TABLE hits STORED AS CSV LOCATION '/dev/stdin' OPTIONS ('format.has_header' 'true'); +SELECT count(*) FROM hits;" +``` + +This works for CSV, JSON, and Parquet. Because standard input is not seekable +(and Parquet stores its metadata at the end of the file), the CLI buffers the +entire input into memory before querying it, so the data must fit in memory. +Standard input is read only once: the buffered contents are reused for any +further tables backed by `/dev/stdin` in the same session. Those tables must +declare the same `STORED AS` format as the first one; a differing format is +rejected with an error. + +The SQL must be passed with `-c`/`--command` or `-f`/`--file` so that standard +input is free to carry the data. In the interactive shell (and when SQL is +piped to the CLI without `-c`/`-f`) standard input carries the SQL itself, and +`LOCATION '/dev/stdin'` returns an error. + **Why Wildcards Are Not Supported** Although wildcards (e.g., _.parquet or \*\*/_.parquet) may work for local