From 6eb3866687f91093477a183e4cea2214d7acbf9f Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 8 Jun 2026 22:13:53 -0700 Subject: [PATCH 1/6] feat: support reading from stdin in datafusion-cli Allow `CREATE EXTERNAL TABLE ... LOCATION '/dev/stdin'` so data can be piped into the CLI, e.g. `cat data.csv | datafusion-cli`. stdin is exposed as a `stdin://` object store, dispatched alongside the other schemes in `get_object_store`. The well-known stdin pseudo-paths (`/dev/stdin`, `/dev/fd/0`, `/proc/self/fd/0`) are rewritten to a canonical `stdin://` URL so they flow through the normal listing path. Because a pipe is not seekable and reports a size of 0, the input is buffered into an in-memory object store up front. This works for CSV, JSON, and Parquet (Parquet needs random access to its footer, which a real pipe cannot provide). Closes #9430 --- datafusion-cli/src/exec.rs | 9 +- datafusion-cli/src/object_storage.rs | 179 +++++++++++++++++++++- datafusion-cli/tests/cli_integration.rs | 46 ++++++ docs/source/user-guide/cli/datasources.md | 17 ++ 4 files changed, 248 insertions(+), 3 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 09347d6d7dc2c..e4245e7da7d5c 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -23,7 +23,7 @@ use crate::print_format::PrintFormat; use crate::{ command::{Command, OutputFormat}, helper::CliHelper, - object_storage::get_object_store, + object_storage::{get_object_store, rewrite_stdin_location}, print_options::{MaxRows, PrintOptions}, }; use datafusion::common::instant::Instant; @@ -418,9 +418,14 @@ async fn create_plan( // Note that cmd is a mutable reference so that create_external_table function can remove all // datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion // will raise Configuration errors. - if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { + if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { // To support custom formats, treat error as None let format = config_file_type_from_str(&cmd.file_type); + + // Expose stdin (e.g. `cat data.csv | datafusion-cli`) as a `stdin://` + // object store, registered like any other scheme in `get_object_store`. + cmd.location = rewrite_stdin_location(&cmd.location, format.as_ref()); + register_object_store_and_config_extensions( ctx, &cmd.location, diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 34787838929f1..32506b6dd757c 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -28,6 +28,7 @@ use datafusion::{ config::ExtensionOptions, config::TableOptions, config::Visit, config_err, exec_datafusion_err, exec_err, }, + config::ConfigFileType, error::{DataFusionError, Result}, execution::context::SessionState, }; @@ -35,11 +36,14 @@ use log::debug; use object_store::{ ClientOptions, CredentialProvider, Error::Generic, - ObjectStore, + ObjectStore, ObjectStoreExt, aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential}, gcp::GoogleCloudStorageBuilder, http::HttpBuilder, + memory::InMemory, + path::Path as ObjectStorePath, }; +use std::io::Read; use std::{ any::Any, error::Error, @@ -564,6 +568,7 @@ pub(crate) async fn get_object_store( .with_url(url.origin().ascii_serialization()) .build()?, ), + STDIN_SCHEME => stdin_object_store(url).await?, _ => { // For other types, try to get from `object_store_registry`: state @@ -578,6 +583,75 @@ pub(crate) async fn get_object_store( Ok(store) } +/// The URL scheme used to expose the process's standard input as an object +/// store, mirroring how `s3`, `gs`, `http`, etc. are addressed. +const STDIN_SCHEME: &str = "stdin"; + +/// Filesystem paths that refer to the process's standard input. +/// +/// These are intentionally limited to the well known pseudo-files exposed by +/// the operating system so that ordinary files are never accidentally treated +/// as stdin. +const STDIN_LOCATIONS: [&str; 3] = ["/dev/stdin", "/dev/fd/0", "/proc/self/fd/0"]; + +/// Returns `true` if `location` refers to the process's standard input. +fn is_stdin_location(location: &str) -> bool { + STDIN_LOCATIONS.contains(&location) +} + +/// Rewrites the well known stdin pseudo-paths (e.g. `/dev/stdin`) to a canonical +/// `stdin://` URL so that reading from standard input flows through the same +/// object-store/listing code path as any other scheme. Non-stdin locations are +/// returned unchanged. +/// +/// The listing layer filters candidate files by extension, so the canonical +/// object is named with the extension matching the declared `STORED AS` format. +pub(crate) fn rewrite_stdin_location( + location: &str, + format: Option<&ConfigFileType>, +) -> String { + if !is_stdin_location(location) { + return location.to_string(); + } + + let object_name = match format { + Some(ConfigFileType::CSV) => "stdin.csv", + Some(ConfigFileType::JSON) => "stdin.json", + Some(ConfigFileType::PARQUET) => "stdin.parquet", + _ => "stdin", + }; + format!("{STDIN_SCHEME}:///{object_name}") +} + +/// Builds the object store backing the `stdin://` scheme by reading all of +/// standard input into memory. +/// +/// A pipe (e.g. `cat data.csv | datafusion-cli`) is not seekable and reports a +/// size of `0`, so it cannot be read directly by the file based formats (CSV +/// requires seeking, Parquet needs the footer at the end of the file). Buffering +/// the whole input up front sidesteps these limitations and lets the data be +/// read like any other object, including being scanned more than once. +async fn stdin_object_store(url: &Url) -> Result> { + let mut buffer = Vec::new(); + std::io::stdin() + .lock() + .read_to_end(&mut buffer) + .map_err(|e| exec_datafusion_err!("Failed to read from stdin: {e}"))?; + in_memory_object_store(url, buffer).await +} + +/// Stores `data` at the path referenced by `url` in a fresh [`InMemory`] store. +async fn in_memory_object_store( + url: &Url, + data: Vec, +) -> Result> { + let store = InMemory::new(); + store + .put(&ObjectStorePath::from_url_path(url.path())?, data.into()) + .await?; + Ok(Arc::new(store)) +} + #[cfg(test)] mod tests { use crate::cli_context::CliSessionContext; @@ -917,4 +991,107 @@ mod tests { } Ok(()) } + + #[test] + fn rewrites_stdin_locations() { + // stdin pseudo-paths are rewritten to a `stdin://` URL carrying the + // extension that matches the declared format. + assert_eq!( + rewrite_stdin_location("/dev/stdin", Some(&ConfigFileType::CSV)), + "stdin:///stdin.csv" + ); + assert_eq!( + rewrite_stdin_location("/dev/fd/0", Some(&ConfigFileType::JSON)), + "stdin:///stdin.json" + ); + assert_eq!( + rewrite_stdin_location("/proc/self/fd/0", Some(&ConfigFileType::PARQUET)), + "stdin:///stdin.parquet" + ); + assert_eq!(rewrite_stdin_location("/dev/stdin", None), "stdin:///stdin"); + + // Ordinary locations are left untouched. + for location in ["/dev/stdout", "data/stdin.csv", "stdin", "s3://b/f.csv"] { + assert_eq!( + rewrite_stdin_location(location, Some(&ConfigFileType::CSV)), + location + ); + } + } + + /// Buffers `data` into the `stdin://` object store and reads it back through + /// a `CREATE EXTERNAL TABLE`, returning the number of rows in the table. + /// + /// This exercises the full path used for `/dev/stdin` short of the actual + /// stdin read, which cannot be driven from a unit test. + async fn count_stdin_rows( + data: Vec, + stored_as: &str, + format: Option, + options: &str, + ) -> Result { + let location = rewrite_stdin_location("/dev/stdin", format.as_ref()); + let url = Url::parse(&location).unwrap(); + let store = in_memory_object_store(&url, data).await?; + + let ctx = SessionContext::new(); + ctx.register_object_store(&url, store); + ctx.sql(&format!( + "CREATE EXTERNAL TABLE t STORED AS {stored_as} LOCATION '{location}' {options}" + )) + .await? + .collect() + .await?; + + ctx.sql("SELECT * FROM t").await?.count().await + } + + #[tokio::test] + async fn stdin_object_store_reads_csv() -> Result<()> { + let data = b"a,b\n1,foo\n2,bar\n".to_vec(); + let rows = count_stdin_rows( + data, + "CSV", + Some(ConfigFileType::CSV), + "OPTIONS ('format.has_header' 'true')", + ) + .await?; + assert_eq!(rows, 2); + Ok(()) + } + + #[tokio::test] + async fn stdin_object_store_reads_json() -> Result<()> { + let data = b"{\"a\": 1, \"b\": \"foo\"}\n{\"a\": 2, \"b\": \"bar\"}\n".to_vec(); + let rows = count_stdin_rows(data, "JSON", Some(ConfigFileType::JSON), "").await?; + assert_eq!(rows, 2); + Ok(()) + } + + #[tokio::test] + async fn stdin_object_store_reads_parquet() -> Result<()> { + use datafusion::arrow::array::Int32Array; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::arrow::record_batch::RecordBatch; + use parquet::arrow::ArrowWriter; + + // Parquet requires random access to the footer, which a real pipe cannot + // provide; the in-memory buffer makes this work. + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + + let mut data = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut data, schema, None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let rows = + count_stdin_rows(data, "PARQUET", Some(ConfigFileType::PARQUET), "").await?; + assert_eq!(rows, 3); + Ok(()) + } } diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index 4849ac9e9a5e2..c0947c077bd6e 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -173,6 +173,52 @@ fn cli_quick_test<'a>( assert_cmd_snapshot!(cmd); } +/// Read data piped into the CLI via the `/dev/stdin` pseudo-path. +/// +/// Unix-only: `/dev/stdin` does not exist on Windows. This drives the real +/// binary through an actual pipe, exercising the stdin read that the in-process +/// unit tests cannot. +#[cfg(unix)] +#[test] +fn test_cli_read_from_stdin() { + use std::io::Write; + use std::process::Stdio; + + let mut child = cli() + .args([ + "-q", + "--command", + "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION '/dev/stdin' \ + OPTIONS ('format.has_header' 'true'); \ + SELECT b, count(*) AS c FROM t GROUP BY b ORDER BY b;", + ]) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("failed to spawn datafusion-cli"); + + child + .stdin + .take() + .unwrap() + .write_all(b"a,b\n1,foo\n2,bar\n3,foo\n") + .unwrap(); + + let output = child.wait_with_output().unwrap(); + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + + assert!( + output.status.success(), + "datafusion-cli failed.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); + assert!( + stdout.contains("| foo | 2 |") && stdout.contains("| bar | 1 |"), + "unexpected output:\n{stdout}" + ); +} + #[test] fn cli_explain_environment_overrides() { let mut settings = make_settings(); diff --git a/docs/source/user-guide/cli/datasources.md b/docs/source/user-guide/cli/datasources.md index 6b1a4887a8a0f..1a85b9bd9777d 100644 --- a/docs/source/user-guide/cli/datasources.md +++ b/docs/source/user-guide/cli/datasources.md @@ -132,6 +132,23 @@ select count(*) from hits; 1 row in set. Query took 0.344 seconds. ``` +## Reading from standard input + +On Unix-like systems you can pipe data into the CLI and query it by pointing the +`LOCATION` at the `/dev/stdin` pseudo-file: + +```console +$ cat hits.csv | datafusion-cli -c " +CREATE EXTERNAL TABLE hits STORED AS CSV LOCATION '/dev/stdin' OPTIONS ('format.has_header' 'true'); +SELECT count(*) FROM hits;" +``` + +This works for CSV, JSON, and Parquet. Because standard input is not seekable +(and Parquet stores its metadata at the end of the file), the CLI buffers the +entire input into memory before querying it, so the data must fit in memory. +Standard input can only be consumed once, so a single session can create at most +one table backed by `/dev/stdin`. + **Why Wildcards Are Not Supported** Although wildcards (e.g., _.parquet or \*\*/_.parquet) may work for local From 9f68e244c322ae02f25316d10cf425f009e58e65 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 8 Jun 2026 22:21:59 -0700 Subject: [PATCH 2/6] refactor: group stdin object store helpers into StdinUtils Move the stdin scheme constant, location helpers, and in-memory object store construction (plus their tests) out of object_storage.rs into a dedicated object_storage/stdin.rs submodule, encapsulated as StdinUtils. --- datafusion-cli/src/exec.rs | 4 +- datafusion-cli/src/object_storage.rs | 183 +--------------- datafusion-cli/src/object_storage/stdin.rs | 230 +++++++++++++++++++++ 3 files changed, 237 insertions(+), 180 deletions(-) create mode 100644 datafusion-cli/src/object_storage/stdin.rs diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index e4245e7da7d5c..4956fe46f43de 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -23,7 +23,7 @@ use crate::print_format::PrintFormat; use crate::{ command::{Command, OutputFormat}, helper::CliHelper, - object_storage::{get_object_store, rewrite_stdin_location}, + object_storage::{get_object_store, stdin::StdinUtils}, print_options::{MaxRows, PrintOptions}, }; use datafusion::common::instant::Instant; @@ -424,7 +424,7 @@ async fn create_plan( // Expose stdin (e.g. `cat data.csv | datafusion-cli`) as a `stdin://` // object store, registered like any other scheme in `get_object_store`. - cmd.location = rewrite_stdin_location(&cmd.location, format.as_ref()); + cmd.location = StdinUtils::rewrite_location(&cmd.location, format.as_ref()); register_object_store_and_config_extensions( ctx, diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 32506b6dd757c..191c7b2179e42 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -16,6 +16,7 @@ // under the License. pub mod instrumented; +pub(crate) mod stdin; use async_trait::async_trait; use aws_config::BehaviorVersion; @@ -28,7 +29,6 @@ use datafusion::{ config::ExtensionOptions, config::TableOptions, config::Visit, config_err, exec_datafusion_err, exec_err, }, - config::ConfigFileType, error::{DataFusionError, Result}, execution::context::SessionState, }; @@ -36,14 +36,11 @@ use log::debug; use object_store::{ ClientOptions, CredentialProvider, Error::Generic, - ObjectStore, ObjectStoreExt, + ObjectStore, aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential}, gcp::GoogleCloudStorageBuilder, http::HttpBuilder, - memory::InMemory, - path::Path as ObjectStorePath, }; -use std::io::Read; use std::{ any::Any, error::Error, @@ -568,7 +565,9 @@ pub(crate) async fn get_object_store( .with_url(url.origin().ascii_serialization()) .build()?, ), - STDIN_SCHEME => stdin_object_store(url).await?, + _ if scheme == stdin::StdinUtils::SCHEME => { + stdin::StdinUtils::object_store(url).await? + } _ => { // For other types, try to get from `object_store_registry`: state @@ -583,75 +582,6 @@ pub(crate) async fn get_object_store( Ok(store) } -/// The URL scheme used to expose the process's standard input as an object -/// store, mirroring how `s3`, `gs`, `http`, etc. are addressed. -const STDIN_SCHEME: &str = "stdin"; - -/// Filesystem paths that refer to the process's standard input. -/// -/// These are intentionally limited to the well known pseudo-files exposed by -/// the operating system so that ordinary files are never accidentally treated -/// as stdin. -const STDIN_LOCATIONS: [&str; 3] = ["/dev/stdin", "/dev/fd/0", "/proc/self/fd/0"]; - -/// Returns `true` if `location` refers to the process's standard input. -fn is_stdin_location(location: &str) -> bool { - STDIN_LOCATIONS.contains(&location) -} - -/// Rewrites the well known stdin pseudo-paths (e.g. `/dev/stdin`) to a canonical -/// `stdin://` URL so that reading from standard input flows through the same -/// object-store/listing code path as any other scheme. Non-stdin locations are -/// returned unchanged. -/// -/// The listing layer filters candidate files by extension, so the canonical -/// object is named with the extension matching the declared `STORED AS` format. -pub(crate) fn rewrite_stdin_location( - location: &str, - format: Option<&ConfigFileType>, -) -> String { - if !is_stdin_location(location) { - return location.to_string(); - } - - let object_name = match format { - Some(ConfigFileType::CSV) => "stdin.csv", - Some(ConfigFileType::JSON) => "stdin.json", - Some(ConfigFileType::PARQUET) => "stdin.parquet", - _ => "stdin", - }; - format!("{STDIN_SCHEME}:///{object_name}") -} - -/// Builds the object store backing the `stdin://` scheme by reading all of -/// standard input into memory. -/// -/// A pipe (e.g. `cat data.csv | datafusion-cli`) is not seekable and reports a -/// size of `0`, so it cannot be read directly by the file based formats (CSV -/// requires seeking, Parquet needs the footer at the end of the file). Buffering -/// the whole input up front sidesteps these limitations and lets the data be -/// read like any other object, including being scanned more than once. -async fn stdin_object_store(url: &Url) -> Result> { - let mut buffer = Vec::new(); - std::io::stdin() - .lock() - .read_to_end(&mut buffer) - .map_err(|e| exec_datafusion_err!("Failed to read from stdin: {e}"))?; - in_memory_object_store(url, buffer).await -} - -/// Stores `data` at the path referenced by `url` in a fresh [`InMemory`] store. -async fn in_memory_object_store( - url: &Url, - data: Vec, -) -> Result> { - let store = InMemory::new(); - store - .put(&ObjectStorePath::from_url_path(url.path())?, data.into()) - .await?; - Ok(Arc::new(store)) -} - #[cfg(test)] mod tests { use crate::cli_context::CliSessionContext; @@ -991,107 +921,4 @@ mod tests { } Ok(()) } - - #[test] - fn rewrites_stdin_locations() { - // stdin pseudo-paths are rewritten to a `stdin://` URL carrying the - // extension that matches the declared format. - assert_eq!( - rewrite_stdin_location("/dev/stdin", Some(&ConfigFileType::CSV)), - "stdin:///stdin.csv" - ); - assert_eq!( - rewrite_stdin_location("/dev/fd/0", Some(&ConfigFileType::JSON)), - "stdin:///stdin.json" - ); - assert_eq!( - rewrite_stdin_location("/proc/self/fd/0", Some(&ConfigFileType::PARQUET)), - "stdin:///stdin.parquet" - ); - assert_eq!(rewrite_stdin_location("/dev/stdin", None), "stdin:///stdin"); - - // Ordinary locations are left untouched. - for location in ["/dev/stdout", "data/stdin.csv", "stdin", "s3://b/f.csv"] { - assert_eq!( - rewrite_stdin_location(location, Some(&ConfigFileType::CSV)), - location - ); - } - } - - /// Buffers `data` into the `stdin://` object store and reads it back through - /// a `CREATE EXTERNAL TABLE`, returning the number of rows in the table. - /// - /// This exercises the full path used for `/dev/stdin` short of the actual - /// stdin read, which cannot be driven from a unit test. - async fn count_stdin_rows( - data: Vec, - stored_as: &str, - format: Option, - options: &str, - ) -> Result { - let location = rewrite_stdin_location("/dev/stdin", format.as_ref()); - let url = Url::parse(&location).unwrap(); - let store = in_memory_object_store(&url, data).await?; - - let ctx = SessionContext::new(); - ctx.register_object_store(&url, store); - ctx.sql(&format!( - "CREATE EXTERNAL TABLE t STORED AS {stored_as} LOCATION '{location}' {options}" - )) - .await? - .collect() - .await?; - - ctx.sql("SELECT * FROM t").await?.count().await - } - - #[tokio::test] - async fn stdin_object_store_reads_csv() -> Result<()> { - let data = b"a,b\n1,foo\n2,bar\n".to_vec(); - let rows = count_stdin_rows( - data, - "CSV", - Some(ConfigFileType::CSV), - "OPTIONS ('format.has_header' 'true')", - ) - .await?; - assert_eq!(rows, 2); - Ok(()) - } - - #[tokio::test] - async fn stdin_object_store_reads_json() -> Result<()> { - let data = b"{\"a\": 1, \"b\": \"foo\"}\n{\"a\": 2, \"b\": \"bar\"}\n".to_vec(); - let rows = count_stdin_rows(data, "JSON", Some(ConfigFileType::JSON), "").await?; - assert_eq!(rows, 2); - Ok(()) - } - - #[tokio::test] - async fn stdin_object_store_reads_parquet() -> Result<()> { - use datafusion::arrow::array::Int32Array; - use datafusion::arrow::datatypes::{DataType, Field, Schema}; - use datafusion::arrow::record_batch::RecordBatch; - use parquet::arrow::ArrowWriter; - - // Parquet requires random access to the footer, which a real pipe cannot - // provide; the in-memory buffer makes this work. - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); - let batch = RecordBatch::try_new( - Arc::clone(&schema), - vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], - ) - .unwrap(); - - let mut data = Vec::new(); - let mut writer = ArrowWriter::try_new(&mut data, schema, None).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - - let rows = - count_stdin_rows(data, "PARQUET", Some(ConfigFileType::PARQUET), "").await?; - assert_eq!(rows, 3); - Ok(()) - } } diff --git a/datafusion-cli/src/object_storage/stdin.rs b/datafusion-cli/src/object_storage/stdin.rs new file mode 100644 index 0000000000000..b534fe0056ec8 --- /dev/null +++ b/datafusion-cli/src/object_storage/stdin.rs @@ -0,0 +1,230 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Exposes the process's standard input as a `stdin://` object store so that +//! piped data (e.g. `cat data.csv | datafusion-cli`) can be queried via +//! `CREATE EXTERNAL TABLE ... LOCATION '/dev/stdin'`. + +use std::io::Read; +use std::sync::Arc; + +use datafusion::common::exec_datafusion_err; +use datafusion::config::ConfigFileType; +use datafusion::error::Result; + +use object_store::memory::InMemory; +use object_store::path::Path as ObjectStorePath; +use object_store::{ObjectStore, ObjectStoreExt}; +use url::Url; + +/// Utilities for exposing the process's standard input as an object store. +/// +/// stdin is surfaced as a `stdin://` object store and dispatched alongside the +/// other schemes (`s3`, `gs`, `http`, ...) so that reading piped data flows +/// through the normal object-store/listing code path, conceptually similar to +/// DuckDB's `PipeFileSystem`. +pub(crate) struct StdinUtils; + +impl StdinUtils { + /// The URL scheme used to expose stdin as an object store, mirroring how + /// `s3`, `gs`, `http`, etc. are addressed. + pub(crate) const SCHEME: &'static str = "stdin"; + + /// Filesystem paths that refer to the process's standard input. + /// + /// These are intentionally limited to the well known pseudo-files exposed + /// by the operating system so that ordinary files are never accidentally + /// treated as stdin. + const LOCATIONS: [&'static str; 3] = ["/dev/stdin", "/dev/fd/0", "/proc/self/fd/0"]; + + /// Returns `true` if `location` refers to the process's standard input. + fn is_stdin_location(location: &str) -> bool { + Self::LOCATIONS.contains(&location) + } + + /// Rewrites the well known stdin pseudo-paths (e.g. `/dev/stdin`) to a + /// canonical `stdin://` URL so that reading from standard input flows + /// through the same object-store/listing code path as any other scheme. + /// Non-stdin locations are returned unchanged. + /// + /// The listing layer filters candidate files by extension, so the canonical + /// object is named with the extension matching the declared `STORED AS` + /// format. + pub(crate) fn rewrite_location( + location: &str, + format: Option<&ConfigFileType>, + ) -> String { + if !Self::is_stdin_location(location) { + return location.to_string(); + } + + let object_name = match format { + Some(ConfigFileType::CSV) => "stdin.csv", + Some(ConfigFileType::JSON) => "stdin.json", + Some(ConfigFileType::PARQUET) => "stdin.parquet", + _ => "stdin", + }; + format!("{}:///{object_name}", Self::SCHEME) + } + + /// Builds the object store backing the `stdin://` scheme by reading all of + /// standard input into memory. + /// + /// A pipe (e.g. `cat data.csv | datafusion-cli`) is not seekable and reports + /// a size of `0`, so it cannot be read directly by the file based formats + /// (CSV requires seeking, Parquet needs the footer at the end of the file). + /// Buffering the whole input up front sidesteps these limitations and lets + /// the data be read like any other object, including being scanned more than + /// once. + pub(crate) async fn object_store(url: &Url) -> Result> { + let mut buffer = Vec::new(); + std::io::stdin() + .lock() + .read_to_end(&mut buffer) + .map_err(|e| exec_datafusion_err!("Failed to read from stdin: {e}"))?; + Self::in_memory_object_store(url, buffer).await + } + + /// Stores `data` at the path referenced by `url` in a fresh [`InMemory`] + /// store. + async fn in_memory_object_store( + url: &Url, + data: Vec, + ) -> Result> { + let store = InMemory::new(); + store + .put(&ObjectStorePath::from_url_path(url.path())?, data.into()) + .await?; + Ok(Arc::new(store)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use datafusion::prelude::SessionContext; + + #[test] + fn rewrites_stdin_locations() { + // stdin pseudo-paths are rewritten to a `stdin://` URL carrying the + // extension that matches the declared format. + assert_eq!( + StdinUtils::rewrite_location("/dev/stdin", Some(&ConfigFileType::CSV)), + "stdin:///stdin.csv" + ); + assert_eq!( + StdinUtils::rewrite_location("/dev/fd/0", Some(&ConfigFileType::JSON)), + "stdin:///stdin.json" + ); + assert_eq!( + StdinUtils::rewrite_location( + "/proc/self/fd/0", + Some(&ConfigFileType::PARQUET) + ), + "stdin:///stdin.parquet" + ); + assert_eq!( + StdinUtils::rewrite_location("/dev/stdin", None), + "stdin:///stdin" + ); + + // Ordinary locations are left untouched. + for location in ["/dev/stdout", "data/stdin.csv", "stdin", "s3://b/f.csv"] { + assert_eq!( + StdinUtils::rewrite_location(location, Some(&ConfigFileType::CSV)), + location + ); + } + } + + /// Buffers `data` into the `stdin://` object store and reads it back through + /// a `CREATE EXTERNAL TABLE`, returning the number of rows in the table. + /// + /// This exercises the full path used for `/dev/stdin` short of the actual + /// stdin read, which cannot be driven from a unit test. + async fn count_stdin_rows( + data: Vec, + stored_as: &str, + format: Option, + options: &str, + ) -> Result { + let location = StdinUtils::rewrite_location("/dev/stdin", format.as_ref()); + let url = Url::parse(&location).unwrap(); + let store = StdinUtils::in_memory_object_store(&url, data).await?; + + let ctx = SessionContext::new(); + ctx.register_object_store(&url, store); + ctx.sql(&format!( + "CREATE EXTERNAL TABLE t STORED AS {stored_as} LOCATION '{location}' {options}" + )) + .await? + .collect() + .await?; + + ctx.sql("SELECT * FROM t").await?.count().await + } + + #[tokio::test] + async fn stdin_object_store_reads_csv() -> Result<()> { + let data = b"a,b\n1,foo\n2,bar\n".to_vec(); + let rows = count_stdin_rows( + data, + "CSV", + Some(ConfigFileType::CSV), + "OPTIONS ('format.has_header' 'true')", + ) + .await?; + assert_eq!(rows, 2); + Ok(()) + } + + #[tokio::test] + async fn stdin_object_store_reads_json() -> Result<()> { + let data = b"{\"a\": 1, \"b\": \"foo\"}\n{\"a\": 2, \"b\": \"bar\"}\n".to_vec(); + let rows = count_stdin_rows(data, "JSON", Some(ConfigFileType::JSON), "").await?; + assert_eq!(rows, 2); + Ok(()) + } + + #[tokio::test] + async fn stdin_object_store_reads_parquet() -> Result<()> { + use datafusion::arrow::array::Int32Array; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::arrow::record_batch::RecordBatch; + use parquet::arrow::ArrowWriter; + + // Parquet requires random access to the footer, which a real pipe cannot + // provide; the in-memory buffer makes this work. + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + + let mut data = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut data, schema, None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let rows = + count_stdin_rows(data, "PARQUET", Some(ConfigFileType::PARQUET), "").await?; + assert_eq!(rows, 3); + Ok(()) + } +} From 0a4dda561683a240a64e83d07936f6d03465f348 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 9 Jun 2026 08:36:25 -0700 Subject: [PATCH 3/6] fix: reuse buffered stdin store for repeated stdin:// tables stdin is one-shot. The object store registry keys by scheme/authority, so a second CREATE EXTERNAL TABLE ... LOCATION '/dev/stdin' previously re-read (now-EOF) stdin, built an empty store, and overwrote the populated one, silently emptying the first table. get_or_create now returns the already-registered store instead of re-reading stdin. --- datafusion-cli/src/object_storage.rs | 2 +- datafusion-cli/src/object_storage/stdin.rs | 42 +++++++++++++++++++++- docs/source/user-guide/cli/datasources.md | 4 +-- 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 191c7b2179e42..95c9125d57b3e 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -566,7 +566,7 @@ pub(crate) async fn get_object_store( .build()?, ), _ if scheme == stdin::StdinUtils::SCHEME => { - stdin::StdinUtils::object_store(url).await? + stdin::StdinUtils::get_or_create(state, url).await? } _ => { // For other types, try to get from `object_store_registry`: diff --git a/datafusion-cli/src/object_storage/stdin.rs b/datafusion-cli/src/object_storage/stdin.rs index b534fe0056ec8..0234ba1fb69ce 100644 --- a/datafusion-cli/src/object_storage/stdin.rs +++ b/datafusion-cli/src/object_storage/stdin.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use datafusion::common::exec_datafusion_err; use datafusion::config::ConfigFileType; use datafusion::error::Result; +use datafusion::execution::context::SessionState; use object_store::memory::InMemory; use object_store::path::Path as ObjectStorePath; @@ -81,6 +82,26 @@ impl StdinUtils { format!("{}:///{object_name}", Self::SCHEME) } + /// Returns the object store backing the `stdin://` scheme, reading and + /// buffering standard input on first use and reusing that buffer for any + /// subsequent `stdin://` table created in the same session. + /// + /// stdin is a one-shot stream: it can only be read once. The object store + /// registry keys by scheme/authority, so every `stdin://` URL maps to the + /// same store. Without this guard, a second `CREATE EXTERNAL TABLE ... + /// LOCATION '/dev/stdin'` would re-read (now-EOF) stdin, build an empty + /// store, and overwrite the populated one, silently emptying the earlier + /// table. Reusing the already-registered store avoids that. + pub(crate) async fn get_or_create( + state: &SessionState, + url: &Url, + ) -> Result> { + if let Ok(existing) = state.runtime_env().object_store_registry.get_store(url) { + return Ok(existing); + } + Self::object_store(url).await + } + /// Builds the object store backing the `stdin://` scheme by reading all of /// standard input into memory. /// @@ -90,7 +111,7 @@ impl StdinUtils { /// Buffering the whole input up front sidesteps these limitations and lets /// the data be read like any other object, including being scanned more than /// once. - pub(crate) async fn object_store(url: &Url) -> Result> { + async fn object_store(url: &Url) -> Result> { let mut buffer = Vec::new(); std::io::stdin() .lock() @@ -179,6 +200,25 @@ mod tests { ctx.sql("SELECT * FROM t").await?.count().await } + #[tokio::test] + async fn reuses_buffered_stdin_store() -> Result<()> { + // stdin can only be read once, so a second `stdin://` table must reuse + // the store buffered by the first instead of re-reading (now-empty) + // stdin and overwriting it. + let url = Url::parse("stdin:///stdin.csv").unwrap(); + let store = + StdinUtils::in_memory_object_store(&url, b"a\n1\n2\n".to_vec()).await?; + + let ctx = SessionContext::new(); + ctx.register_object_store(&url, store); + + let reused = StdinUtils::get_or_create(&ctx.state(), &url).await?; + let path = ObjectStorePath::from_url_path(url.path())?; + let bytes = reused.get(&path).await?.bytes().await?; + assert_eq!(bytes.as_ref(), b"a\n1\n2\n"); + Ok(()) + } + #[tokio::test] async fn stdin_object_store_reads_csv() -> Result<()> { let data = b"a,b\n1,foo\n2,bar\n".to_vec(); diff --git a/docs/source/user-guide/cli/datasources.md b/docs/source/user-guide/cli/datasources.md index 1a85b9bd9777d..53a21bd4f602b 100644 --- a/docs/source/user-guide/cli/datasources.md +++ b/docs/source/user-guide/cli/datasources.md @@ -146,8 +146,8 @@ SELECT count(*) FROM hits;" This works for CSV, JSON, and Parquet. Because standard input is not seekable (and Parquet stores its metadata at the end of the file), the CLI buffers the entire input into memory before querying it, so the data must fit in memory. -Standard input can only be consumed once, so a single session can create at most -one table backed by `/dev/stdin`. +Standard input is read only once: the buffered contents are reused for any +further tables backed by `/dev/stdin` in the same session. **Why Wildcards Are Not Supported** From 006c1c964cd36bdaee5dc94f2e1d259cfa5dea04 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 9 Jun 2026 08:57:42 -0700 Subject: [PATCH 4/6] test: cover repeated stdin tables end-to-end Add an integration test that creates two /dev/stdin tables in one session and asserts both still report the buffered rows, guarding the one-shot stdin reuse fix at the binary level. Factor the pipe-spawning boilerplate into a shared run_cli_with_stdin helper. --- datafusion-cli/tests/cli_integration.rs | 68 ++++++++++++++++++------- 1 file changed, 50 insertions(+), 18 deletions(-) diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index c0947c077bd6e..dee5d1fde7f4a 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -181,42 +181,74 @@ fn cli_quick_test<'a>( #[cfg(unix)] #[test] fn test_cli_read_from_stdin() { + let stdout = run_cli_with_stdin( + "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION '/dev/stdin' \ + OPTIONS ('format.has_header' 'true'); \ + SELECT b, count(*) AS c FROM t GROUP BY b ORDER BY b;", + b"a,b\n1,foo\n2,bar\n3,foo\n", + ); + + assert!( + stdout.contains("| foo | 2 |") && stdout.contains("| bar | 1 |"), + "unexpected output:\n{stdout}" + ); +} + +/// stdin is a one-shot stream, so a second `/dev/stdin` table in the same +/// session must reuse the buffered input rather than re-reading (now-empty) +/// stdin and silently emptying the first table. +#[cfg(unix)] +#[test] +fn test_cli_read_from_stdin_twice_reuses_buffer() { + let stdout = run_cli_with_stdin( + "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION '/dev/stdin' \ + OPTIONS ('format.has_header' 'true'); \ + CREATE EXTERNAL TABLE t2 STORED AS CSV LOCATION '/dev/stdin' \ + OPTIONS ('format.has_header' 'true'); \ + SELECT count(*) AS t_count FROM t; \ + SELECT count(*) AS t2_count FROM t2;", + b"a,b\n1,foo\n2,bar\n", + ); + + // Both tables must still see the two buffered rows; before the fix the + // first table's store was overwritten with an empty one (t_count = 0). + let counts: Vec<&str> = stdout + .lines() + .filter(|line| line.trim_start().starts_with("| 2 ")) + .collect(); + assert_eq!( + counts.len(), + 2, + "expected both stdin tables to report 2 rows, got:\n{stdout}" + ); +} + +/// Spawns the real `datafusion-cli` binary, pipes `stdin` into it, and returns +/// its stdout after asserting a successful exit. +#[cfg(unix)] +fn run_cli_with_stdin(command: &str, stdin: &[u8]) -> String { use std::io::Write; use std::process::Stdio; let mut child = cli() - .args([ - "-q", - "--command", - "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION '/dev/stdin' \ - OPTIONS ('format.has_header' 'true'); \ - SELECT b, count(*) AS c FROM t GROUP BY b ORDER BY b;", - ]) + .args(["-q", "--command", command]) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() .expect("failed to spawn datafusion-cli"); - child - .stdin - .take() - .unwrap() - .write_all(b"a,b\n1,foo\n2,bar\n3,foo\n") - .unwrap(); + child.stdin.take().unwrap().write_all(stdin).unwrap(); let output = child.wait_with_output().unwrap(); - let stdout = String::from_utf8_lossy(&output.stdout); + let stdout = String::from_utf8_lossy(&output.stdout).into_owned(); let stderr = String::from_utf8_lossy(&output.stderr); assert!( output.status.success(), "datafusion-cli failed.\nstdout:\n{stdout}\nstderr:\n{stderr}" ); - assert!( - stdout.contains("| foo | 2 |") && stdout.contains("| bar | 1 |"), - "unexpected output:\n{stdout}" - ); + stdout } #[test] From 598951d4f24bfd288b8d8ce3006ad1dd5b41b74f Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 9 Jun 2026 10:30:11 -0700 Subject: [PATCH 5/6] test: drop redundant comment in stdin reuse test --- datafusion-cli/tests/cli_integration.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index dee5d1fde7f4a..9582f9e1d1f1e 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -210,8 +210,7 @@ fn test_cli_read_from_stdin_twice_reuses_buffer() { b"a,b\n1,foo\n2,bar\n", ); - // Both tables must still see the two buffered rows; before the fix the - // first table's store was overwritten with an empty one (t_count = 0). + // Both tables must still see the two buffered rows. let counts: Vec<&str> = stdout .lines() .filter(|line| line.trim_start().starts_with("| 2 ")) From 928c06ab23e2f15cdce86adf5231028a8b7709bf Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 10 Jun 2026 00:18:20 -0700 Subject: [PATCH 6/6] fix: reject mismatched stdin formats; error when stdin carries SQL MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The object store registry keys by scheme/authority, so a second /dev/stdin table with a different STORED AS format reused the registered store but looked up a per-format object path it did not contain. The buffered object's name records the format stdin was consumed as; a later stdin-backed table declaring a different format now fails with a clear error. Sharing the bytes across formats could be silently wrong — e.g. piped NDJSON read by a second CSV table "succeeds" as a one-column table instead of surfacing the mistake. Same-format reuse still works. Also error clearly when stdin cannot carry data: when the REPL owns stdin (interactive, or SQL piped without -c/-f), buffering it for a table would consume the remaining statements as table data (silently, exit 0); when stdin is a TTY the read would block until Ctrl-D with Ctrl-C swallowed. Co-authored-by: Isaac --- datafusion-cli/src/main.rs | 22 +++- datafusion-cli/src/object_storage.rs | 2 + datafusion-cli/src/object_storage/stdin.rs | 117 +++++++++++++++++++-- datafusion-cli/tests/cli_integration.rs | 89 ++++++++++++++++ docs/source/user-guide/cli/datasources.md | 9 +- 5 files changed, 229 insertions(+), 10 deletions(-) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 935bf0a9744dd..e6da95af2eec9 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -34,6 +34,7 @@ use datafusion_cli::catalog::DynamicObjectStoreCatalog; use datafusion_cli::functions::{ ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc, }; +use datafusion_cli::object_storage::StdinCarriesCommands; use datafusion_cli::object_storage::instrumented::{ InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, }; @@ -158,6 +159,14 @@ struct Args { object_store_profiling: InstrumentedObjectStoreMode, } +impl Args { + /// Without -c/-f the CLI enters the REPL, which reads its SQL from + /// stdin — interactively or piped. + fn repl_mode(&self) -> bool { + self.command.is_empty() && self.file.is_empty() + } +} + #[tokio::main] /// Calls [`main_inner`], then handles printing errors and returning the correct exit code pub async fn main() -> ExitCode { @@ -268,6 +277,7 @@ async fn main_inner() -> Result<()> { instrumented_registry: Arc::clone(&instrumented_registry), }; + let repl_mode = args.repl_mode(); let commands = args.command; let files = args.file; let rc = match args.rc { @@ -285,7 +295,7 @@ async fn main_inner() -> Result<()> { } }; - if commands.is_empty() && files.is_empty() { + if repl_mode { if !rc.is_empty() { exec::exec_from_files(&ctx, rc, &print_options).await?; } @@ -330,8 +340,16 @@ fn get_session_config(args: &Args) -> Result { config_options.format.null = String::from("NULL"); } - let session_config = + let mut session_config = SessionConfig::from(config_options).with_information_schema(true); + + if args.repl_mode() { + // In the REPL stdin carries the SQL (including for any rc file run + // before it), so it cannot also serve as a data source for + // `LOCATION '/dev/stdin'`. + session_config = session_config.with_extension(Arc::new(StdinCarriesCommands)); + } + Ok(session_config) } diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 95c9125d57b3e..c11c87a97aff7 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -18,6 +18,8 @@ pub mod instrumented; pub(crate) mod stdin; +pub use stdin::StdinCarriesCommands; + use async_trait::async_trait; use aws_config::BehaviorVersion; use aws_credential_types::provider::{ diff --git a/datafusion-cli/src/object_storage/stdin.rs b/datafusion-cli/src/object_storage/stdin.rs index 0234ba1fb69ce..b83c2c64b53b4 100644 --- a/datafusion-cli/src/object_storage/stdin.rs +++ b/datafusion-cli/src/object_storage/stdin.rs @@ -19,19 +19,29 @@ //! piped data (e.g. `cat data.csv | datafusion-cli`) can be queried via //! `CREATE EXTERNAL TABLE ... LOCATION '/dev/stdin'`. -use std::io::Read; +use std::io::{IsTerminal, Read}; use std::sync::Arc; use datafusion::common::exec_datafusion_err; use datafusion::config::ConfigFileType; use datafusion::error::Result; use datafusion::execution::context::SessionState; +use futures::TryStreamExt; use object_store::memory::InMemory; use object_store::path::Path as ObjectStorePath; use object_store::{ObjectStore, ObjectStoreExt}; use url::Url; +/// Marker [`SessionConfig`] extension recording that the session reads its SQL +/// commands from stdin (the interactive or piped REPL). stdin cannot then also +/// serve as a data source: reading it for table data would silently consume +/// the remaining SQL statements. +/// +/// [`SessionConfig`]: datafusion::execution::context::SessionConfig +#[derive(Debug)] +pub struct StdinCarriesCommands; + /// Utilities for exposing the process's standard input as an object store. /// /// stdin is surfaced as a `stdin://` object store and dispatched alongside the @@ -64,7 +74,10 @@ impl StdinUtils { /// /// The listing layer filters candidate files by extension, so the canonical /// object is named with the extension matching the declared `STORED AS` - /// format. + /// format. The name thereby also records which format stdin was consumed + /// as: a later stdin-backed table declaring a different format resolves to + /// a path the buffered store does not contain and is rejected by + /// [`Self::get_or_create`]. pub(crate) fn rewrite_location( location: &str, format: Option<&ConfigFileType>, @@ -92,14 +105,38 @@ impl StdinUtils { /// LOCATION '/dev/stdin'` would re-read (now-EOF) stdin, build an empty /// store, and overwrite the populated one, silently emptying the earlier /// table. Reusing the already-registered store avoids that. + /// + /// A later stdin-backed table declaring a different `STORED AS` format + /// resolves to an object the store does not contain (the object name + /// records the format stdin was consumed as) and is rejected with a clear + /// error — both reading the buffer as another format and re-reading stdin + /// would be silently wrong. pub(crate) async fn get_or_create( state: &SessionState, url: &Url, ) -> Result> { - if let Ok(existing) = state.runtime_env().object_store_registry.get_store(url) { - return Ok(existing); + let Ok(existing) = state.runtime_env().object_store_registry.get_store(url) + else { + return Self::object_store(state, url).await; + }; + + let path = ObjectStorePath::from_url_path(url.path())?; + if existing.head(&path).await.is_err() { + let buffered = existing + .list(None) + .try_next() + .await + .ok() + .flatten() + .map(|meta| format!(" as '{}'", meta.location)) + .unwrap_or_default(); + return Err(exec_datafusion_err!( + "stdin was already read{buffered} by an earlier statement; all \ + tables backed by stdin in a session must declare the same \ + STORED AS format" + )); } - Self::object_store(url).await + Ok(existing) } /// Builds the object store backing the `stdin://` scheme by reading all of @@ -111,7 +148,29 @@ impl StdinUtils { /// Buffering the whole input up front sidesteps these limitations and lets /// the data be read like any other object, including being scanned more than /// once. - async fn object_store(url: &Url) -> Result> { + async fn object_store( + state: &SessionState, + url: &Url, + ) -> Result> { + if state + .config() + .get_extension::() + .is_some() + { + return Err(exec_datafusion_err!( + "stdin is already being read for SQL commands, so it cannot \ + also supply table data; pass the query with -c/--command or \ + -f/--file so that stdin carries the data, e.g. \ + `cat data.csv | datafusion-cli -f query.sql`" + )); + } + if std::io::stdin().is_terminal() { + return Err(exec_datafusion_err!( + "stdin is connected to a terminal, not piped data; pipe the \ + input in, e.g. `cat data.csv | datafusion-cli -f query.sql`" + )); + } + let mut buffer = Vec::new(); std::io::stdin() .lock() @@ -138,7 +197,7 @@ impl StdinUtils { mod tests { use super::*; - use datafusion::prelude::SessionContext; + use datafusion::prelude::{SessionConfig, SessionContext}; #[test] fn rewrites_stdin_locations() { @@ -219,6 +278,50 @@ mod tests { Ok(()) } + #[tokio::test] + async fn rejects_second_stdin_table_with_different_format() -> Result<()> { + // The buffered object's name records the format stdin was consumed + // as; a later stdin table declaring a different format must fail with + // a clear error rather than a downstream "not found" (or silently + // misreading the bytes as another format). + let csv_url = Url::parse("stdin:///stdin.csv").unwrap(); + let store = + StdinUtils::in_memory_object_store(&csv_url, b"a\n1\n".to_vec()).await?; + + let ctx = SessionContext::new(); + ctx.register_object_store(&csv_url, store); + + let json_url = Url::parse("stdin:///stdin.json").unwrap(); + let err = StdinUtils::get_or_create(&ctx.state(), &json_url) + .await + .unwrap_err() + .to_string(); + assert!( + err.contains("must declare the same STORED AS format") + && err.contains("stdin.csv"), + "unexpected error: {err}" + ); + Ok(()) + } + + #[tokio::test] + async fn errors_when_stdin_carries_commands() { + // Once the REPL owns stdin for SQL commands, building the stdin store + // must fail with a clear error instead of swallowing the remaining + // statements as table data. + let config = SessionConfig::new().with_extension(Arc::new(StdinCarriesCommands)); + let ctx = SessionContext::new_with_config(config); + + let url = Url::parse("stdin:///stdin.csv").unwrap(); + let err = StdinUtils::get_or_create(&ctx.state(), &url) + .await + .unwrap_err(); + assert!( + err.to_string().contains("SQL commands"), + "unexpected error: {err}" + ); + } + #[tokio::test] async fn stdin_object_store_reads_csv() -> Result<()> { let data = b"a,b\n1,foo\n2,bar\n".to_vec(); diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index 9582f9e1d1f1e..f99d42d1b4200 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -222,6 +222,95 @@ fn test_cli_read_from_stdin_twice_reuses_buffer() { ); } +/// A later `/dev/stdin` table declaring a different `STORED AS` format must be +/// rejected with a clear error: stdin is one-shot, its bytes were already +/// buffered under the first table's format, and silently reading them as +/// another format would be wrong. +#[cfg(unix)] +#[test] +fn test_cli_read_from_stdin_mixed_formats_rejected() { + use std::io::Write; + use std::process::Stdio; + + let mut child = cli() + .args([ + "-q", + "--command", + "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION '/dev/stdin' \ + OPTIONS ('format.has_header' 'true'); \ + CREATE EXTERNAL TABLE t2 STORED AS JSON LOCATION '/dev/stdin';", + ]) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("failed to spawn datafusion-cli"); + + child + .stdin + .take() + .unwrap() + .write_all(b"a,b\n1,foo\n2,bar\n") + .unwrap(); + + let output = child.wait_with_output().unwrap(); + // Fatal errors in `--command` mode are reported on stdout. + let stdout = String::from_utf8_lossy(&output.stdout); + + assert!( + !output.status.success(), + "expected the mismatched format to fail, stdout:\n{stdout}" + ); + assert!( + stdout.contains("must declare the same STORED AS format"), + "expected a clear mismatch error, got:\n{stdout}" + ); +} + +/// When the SQL itself arrives on stdin (the piped REPL, e.g. `cat script.sql +/// | datafusion-cli`), stdin cannot double as a data source: the statement +/// must fail with a clear error instead of silently consuming the rest of the +/// script as table data, and the remaining statements must still run. +#[cfg(unix)] +#[test] +fn test_cli_stdin_location_rejected_when_sql_comes_from_stdin() { + use std::io::Write; + use std::process::Stdio; + + let mut child = cli() + .arg("-q") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("failed to spawn datafusion-cli"); + + child + .stdin + .take() + .unwrap() + .write_all( + b"CREATE EXTERNAL TABLE t STORED AS CSV LOCATION '/dev/stdin';\n\ + SELECT 123 + 456;\n", + ) + .unwrap(); + + let output = child.wait_with_output().unwrap(); + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + + assert!( + stderr.contains("SQL commands"), + "expected a clear error about stdin carrying SQL.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); + // The statement after the failed CREATE must still execute rather than + // being consumed as table data. + assert!( + stdout.contains("579"), + "expected the following statement to still run.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); +} + /// Spawns the real `datafusion-cli` binary, pipes `stdin` into it, and returns /// its stdout after asserting a successful exit. #[cfg(unix)] diff --git a/docs/source/user-guide/cli/datasources.md b/docs/source/user-guide/cli/datasources.md index 53a21bd4f602b..59a6b0aa43284 100644 --- a/docs/source/user-guide/cli/datasources.md +++ b/docs/source/user-guide/cli/datasources.md @@ -147,7 +147,14 @@ This works for CSV, JSON, and Parquet. Because standard input is not seekable (and Parquet stores its metadata at the end of the file), the CLI buffers the entire input into memory before querying it, so the data must fit in memory. Standard input is read only once: the buffered contents are reused for any -further tables backed by `/dev/stdin` in the same session. +further tables backed by `/dev/stdin` in the same session. Those tables must +declare the same `STORED AS` format as the first one; a differing format is +rejected with an error. + +The SQL must be passed with `-c`/`--command` or `-f`/`--file` so that standard +input is free to carry the data. In the interactive shell (and when SQL is +piped to the CLI without `-c`/`-f`) standard input carries the SQL itself, and +`LOCATION '/dev/stdin'` returns an error. **Why Wildcards Are Not Supported**