diff --git a/datafusion/catalog-listing/src/options.rs b/datafusion/catalog-listing/src/options.rs index 146f98d62335e..0ab15e05abba1 100644 --- a/datafusion/catalog-listing/src/options.rs +++ b/datafusion/catalog-listing/src/options.rs @@ -23,7 +23,7 @@ use datafusion_datasource::file_format::FileFormat; use datafusion_execution::config::SessionConfig; use datafusion_expr::SortExpr; use futures::StreamExt; -use futures::{TryStreamExt, future}; +use futures::TryStreamExt; use itertools::Itertools; use std::sync::Arc; @@ -263,7 +263,15 @@ impl ListingOptions { /// Infer the schema of the files at the given path on the provided object store. /// /// If the table_path contains one or more files (i.e. it is a directory / - /// prefix of files) their schema is merged by calling [`FileFormat::infer_schema`] + /// prefix of files) their schema is merged by calling [`FileFormat::infer_schema`]. + /// + /// Returns a `Plan` error if `table_path` contains no files at all (e.g. an + /// empty or non-existent directory), since an inferred schema with zero + /// columns produces confusing "column not found" errors at query time. + /// Callers that need to support empty locations must declare an explicit + /// schema instead of relying on inference. Locations that contain files + /// which all happen to be 0-byte are still accepted — the empty files are + /// filtered out before format-specific inference runs. /// /// Note: The inferred schema does not include any partitioning columns. /// @@ -275,14 +283,27 @@ impl ListingOptions { ) -> datafusion_common::Result { let store = state.runtime_env().object_store(table_path)?; - let files: Vec<_> = table_path + let all_files: Vec<_> = table_path .list_all_files(state, store.as_ref(), &self.file_extension) .await? - // Empty files cannot affect schema but may throw when trying to read for it - .try_filter(|object_meta| future::ready(object_meta.size > 0)) .try_collect() .await?; + if all_files.is_empty() { + return plan_err!( + "No files found at {}. \ + Cannot infer schema from an empty location; either add data files \ + or declare an explicit schema for the table.", + table_path + ); + } + + // Empty files cannot affect schema but may throw when trying to read for it + let files: Vec<_> = all_files + .into_iter() + .filter(|object_meta| object_meta.size > 0) + .collect(); + let schema = self.format.infer_schema(state, &store, &files).await?; Ok(schema) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index a5139346752a9..f9ab87b528088 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -309,6 +309,10 @@ mod tests { #[tokio::test] async fn test_create_using_folder_with_compression() { let dir = tempfile::tempdir().unwrap(); + // Schema inference now requires at least one file at the location. + // The file itself can be 0-byte — it will be filtered out before the + // format-specific inference runs, leaving an empty inferred schema. + fs::File::create_new(dir.path().join("placeholder.csv.gz")).unwrap(); let factory = ListingTableFactory::new(); let context = SessionContext::new(); @@ -351,6 +355,9 @@ mod tests { #[tokio::test] async fn test_create_using_folder_without_compression() { let dir = tempfile::tempdir().unwrap(); + // See `test_create_using_folder_with_compression` — a placeholder file + // is required so schema inference does not error on an empty location. + fs::File::create_new(dir.path().join("placeholder.csv")).unwrap(); let factory = ListingTableFactory::new(); let context = SessionContext::new(); @@ -387,6 +394,8 @@ mod tests { let mut path = PathBuf::from(dir.path()); path.extend(["odd.v1", "odd.v2"]); fs::create_dir_all(&path).unwrap(); + // Placeholder so schema inference does not error on an empty location. + fs::File::create_new(path.join("placeholder.parquet")).unwrap(); let factory = ListingTableFactory::new(); let context = SessionContext::new(); diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 823dc946ea732..3c750352f199a 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -108,9 +108,7 @@ mod tests { use arrow::util::pretty::pretty_format_batches; use datafusion_common::config::TableParquetOptions; - use datafusion_common::{ - assert_batches_eq, assert_batches_sorted_eq, assert_contains, - }; + use datafusion_common::{assert_batches_sorted_eq, assert_contains}; use datafusion_execution::config::SessionConfig; use tempfile::{TempDir, tempdir}; @@ -374,20 +372,22 @@ mod tests { let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); assert_eq!(total_rows, 5); - // Read the dataframe from 'output4/' + // Read the dataframe from 'output4/' — an empty folder. Inference now + // errors on an empty location instead of producing a 0-column table. std::fs::create_dir(&path4)?; - let read_df = ctx + let err = ctx .read_parquet( &path4, ParquetReadOptions { ..Default::default() }, ) - .await?; - - let results = read_df.collect().await?; - let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); - assert_eq!(total_rows, 0); + .await + .expect_err("read_parquet on an empty folder should error"); + assert!( + err.strip_backtrace().contains("No files found at"), + "unexpected error: {err}" + ); // Read the dataframe from double dot folder; let read_df = ctx @@ -510,17 +510,18 @@ mod tests { let ctx = SessionContext::new(); let test_path = "/foo/"; - let actual = ctx + // Reading from a non-existent / empty location now errors at planning + // time rather than producing a 0-column table that surfaces a confusing + // "column not found" error at query time. + let err = ctx .read_parquet(test_path, ParquetReadOptions::default()) - .await? - .collect() - .await?; - - #[cfg_attr(any(), rustfmt::skip)] - assert_batches_eq!(&[ - "++", - "++", - ], &actual); + .await + .expect_err("read_parquet on an empty location should error"); + let msg = err.strip_backtrace(); + assert!( + msg.contains("No files found at") && msg.contains(test_path), + "unexpected error: {msg}" + ); Ok(()) } diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt index 0579659832feb..977d2d03a1d07 100644 --- a/datafusion/sqllogictest/test_files/ddl.slt +++ b/datafusion/sqllogictest/test_files/ddl.slt @@ -711,6 +711,47 @@ c1 Null 0 c2 Null 1 c3 Null 2 +# Creating an external table over a location with no files and without an +# explicit schema should error rather than producing a 0-column table that +# fails with a confusing "column not found" error at query time. +statement error DataFusion error: Error during planning: No files found at .*\. Cannot infer schema from an empty location; either add data files or declare an explicit schema for the table\. +CREATE EXTERNAL TABLE empty_dir_parquet STORED AS PARQUET LOCATION 'test_files/scratch/ddl/empty_dir/'; + +statement error DataFusion error: Error during planning: No files found at .*\. Cannot infer schema from an empty location; either add data files or declare an explicit schema for the table\. +CREATE EXTERNAL TABLE empty_dir_csv STORED AS CSV LOCATION 'test_files/scratch/ddl/empty_dir/' OPTIONS ('format.has_header' 'true'); + +statement error DataFusion error: Error during planning: No files found at .*\. Cannot infer schema from an empty location; either add data files or declare an explicit schema for the table\. +CREATE EXTERNAL TABLE empty_dir_json STORED AS JSON LOCATION 'test_files/scratch/ddl/empty_dir/'; + +# Providing an explicit schema for the same empty location is still allowed, +# so users can pre-declare a table to be populated later via INSERT. +statement ok +CREATE EXTERNAL TABLE empty_dir_with_schema(x int) STORED AS PARQUET LOCATION 'test_files/scratch/ddl/empty_dir/'; + +query I +select * from empty_dir_with_schema; +---- + +statement ok +drop table empty_dir_with_schema; + +# Once a file is written to the directory, schema inference works as before. +statement ok +COPY (values (1), (2), (3)) TO 'test_files/scratch/ddl/empty_dir_filled/' STORED AS PARQUET; + +statement ok +CREATE EXTERNAL TABLE filled_dir_inferred STORED AS PARQUET LOCATION 'test_files/scratch/ddl/empty_dir_filled/'; + +query I rowsort +select * from filled_dir_inferred; +---- +1 +2 +3 + +statement ok +drop table filled_dir_inferred; + ## should allow any type of exprs as values statement ok