Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::print_format::PrintFormat;
use crate::{
command::{Command, OutputFormat},
helper::CliHelper,
object_storage::get_object_store,
object_storage::{get_object_store, stdin::StdinUtils},
print_options::{MaxRows, PrintOptions},
};
use datafusion::common::instant::Instant;
Expand Down Expand Up @@ -418,9 +418,14 @@ async fn create_plan(
// Note that cmd is a mutable reference so that create_external_table function can remove all
// datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion
// will raise Configuration errors.
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
// To support custom formats, treat error as None
let format = config_file_type_from_str(&cmd.file_type);

// Expose stdin (e.g. `cat data.csv | datafusion-cli`) as a `stdin://`
// object store, registered like any other scheme in `get_object_store`.
cmd.location = StdinUtils::rewrite_location(&cmd.location, format.as_ref());

register_object_store_and_config_extensions(
ctx,
&cmd.location,
Expand Down
22 changes: 20 additions & 2 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::{
ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
};
use datafusion_cli::object_storage::StdinCarriesCommands;
use datafusion_cli::object_storage::instrumented::{
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
};
Expand Down Expand Up @@ -158,6 +159,14 @@ struct Args {
object_store_profiling: InstrumentedObjectStoreMode,
}

impl Args {
/// Without -c/-f the CLI enters the REPL, which reads its SQL from
/// stdin — interactively or piped.
fn repl_mode(&self) -> bool {
self.command.is_empty() && self.file.is_empty()
}
}

#[tokio::main]
/// Calls [`main_inner`], then handles printing errors and returning the correct exit code
pub async fn main() -> ExitCode {
Expand Down Expand Up @@ -268,6 +277,7 @@ async fn main_inner() -> Result<()> {
instrumented_registry: Arc::clone(&instrumented_registry),
};

let repl_mode = args.repl_mode();
let commands = args.command;
let files = args.file;
let rc = match args.rc {
Expand All @@ -285,7 +295,7 @@ async fn main_inner() -> Result<()> {
}
};

if commands.is_empty() && files.is_empty() {
if repl_mode {
if !rc.is_empty() {
exec::exec_from_files(&ctx, rc, &print_options).await?;
}
Expand Down Expand Up @@ -330,8 +340,16 @@ fn get_session_config(args: &Args) -> Result<SessionConfig> {
config_options.format.null = String::from("NULL");
}

let session_config =
let mut session_config =
SessionConfig::from(config_options).with_information_schema(true);

if args.repl_mode() {
// In the REPL stdin carries the SQL (including for any rc file run
// before it), so it cannot also serve as a data source for
// `LOCATION '/dev/stdin'`.
session_config = session_config.with_extension(Arc::new(StdinCarriesCommands));
}

Ok(session_config)
}

Expand Down
6 changes: 6 additions & 0 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
// under the License.

pub mod instrumented;
pub(crate) mod stdin;

pub use stdin::StdinCarriesCommands;

use async_trait::async_trait;
use aws_config::BehaviorVersion;
Expand Down Expand Up @@ -564,6 +567,9 @@ pub(crate) async fn get_object_store(
.with_url(url.origin().ascii_serialization())
.build()?,
),
_ if scheme == stdin::StdinUtils::SCHEME => {
stdin::StdinUtils::get_or_create(state, url).await?
}
_ => {
// For other types, try to get from `object_store_registry`:
state
Expand Down
Loading