Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions datafusion/catalog-listing/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use datafusion_datasource::file_format::FileFormat;
use datafusion_execution::config::SessionConfig;
use datafusion_expr::SortExpr;
use futures::StreamExt;
use futures::{TryStreamExt, future};
use futures::TryStreamExt;
use itertools::Itertools;
use std::sync::Arc;

Expand Down Expand Up @@ -263,7 +263,15 @@ impl ListingOptions {
/// Infer the schema of the files at the given path on the provided object store.
///
/// If the table_path contains one or more files (i.e. it is a directory /
/// prefix of files) their schema is merged by calling [`FileFormat::infer_schema`]
/// prefix of files) their schema is merged by calling [`FileFormat::infer_schema`].
///
/// Returns a `Plan` error if `table_path` contains no files at all (e.g. an
/// empty or non-existent directory), since an inferred schema with zero
/// columns produces confusing "column not found" errors at query time.
/// Callers that need to support empty locations must declare an explicit
/// schema instead of relying on inference. Locations that contain files
/// which all happen to be 0-byte are still accepted — the empty files are
/// filtered out before format-specific inference runs.
///
/// Note: The inferred schema does not include any partitioning columns.
///
Expand All @@ -275,14 +283,27 @@ impl ListingOptions {
) -> datafusion_common::Result<SchemaRef> {
let store = state.runtime_env().object_store(table_path)?;

let files: Vec<_> = table_path
let all_files: Vec<_> = table_path
.list_all_files(state, store.as_ref(), &self.file_extension)
.await?
// Empty files cannot affect schema but may throw when trying to read for it
.try_filter(|object_meta| future::ready(object_meta.size > 0))
Comment on lines -286 to -287
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just means we carry around memory for the ObjectMeta of zero sized files until a couple lines later. I think this is not a big problem.

The alternative is that we error even where there are 0 byte files present. I think that's an interesting discussion: e.g. a completely empty data.csv. Or hive partitioned directories with no data. I think all of these should still require an explicit schema or error, but there are tests that check the opposite behavior:

  • test_csv_empty_file — registers tests/data/empty_0_byte.csv (0 bytes, no header, no data) and runs SELECT * FROM empty.
  • test_csv_multiple_empty_files — folder of 0-byte CSVs. Same situation.
  • it_can_read_empty_ndjson — 0-byte JSON file. Same.
  • test_read_empty_parquet — 0-byte parquet file. Same.
  • test_read_partitioned_empty_parquet — partition dir with a 0-byte parquet.

.try_collect()
.await?;

if all_files.is_empty() {
return plan_err!(
"No files found at {}. \
Cannot infer schema from an empty location; either add data files \
or declare an explicit schema for the table.",
table_path
);
}

// Empty files cannot affect schema but may throw when trying to read for it
let files: Vec<_> = all_files
.into_iter()
.filter(|object_meta| object_meta.size > 0)
.collect();

let schema = self.format.infer_schema(state, &store, &files).await?;

Ok(schema)
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ mod tests {
#[tokio::test]
async fn test_create_using_folder_with_compression() {
let dir = tempfile::tempdir().unwrap();
// Schema inference now requires at least one file at the location.
// The file itself can be 0-byte — it will be filtered out before the
// format-specific inference runs, leaving an empty inferred schema.
fs::File::create_new(dir.path().join("placeholder.csv.gz")).unwrap();

let factory = ListingTableFactory::new();
let context = SessionContext::new();
Expand Down Expand Up @@ -351,6 +355,9 @@ mod tests {
#[tokio::test]
async fn test_create_using_folder_without_compression() {
let dir = tempfile::tempdir().unwrap();
// See `test_create_using_folder_with_compression` — a placeholder file
// is required so schema inference does not error on an empty location.
fs::File::create_new(dir.path().join("placeholder.csv")).unwrap();

let factory = ListingTableFactory::new();
let context = SessionContext::new();
Expand Down Expand Up @@ -387,6 +394,8 @@ mod tests {
let mut path = PathBuf::from(dir.path());
path.extend(["odd.v1", "odd.v2"]);
fs::create_dir_all(&path).unwrap();
// Placeholder so schema inference does not error on an empty location.
fs::File::create_new(path.join("placeholder.parquet")).unwrap();

let factory = ListingTableFactory::new();
let context = SessionContext::new();
Expand Down
41 changes: 21 additions & 20 deletions datafusion/core/src/execution/context/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ mod tests {

use arrow::util::pretty::pretty_format_batches;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::{
assert_batches_eq, assert_batches_sorted_eq, assert_contains,
};
use datafusion_common::{assert_batches_sorted_eq, assert_contains};
use datafusion_execution::config::SessionConfig;

use tempfile::{TempDir, tempdir};
Expand Down Expand Up @@ -374,20 +372,22 @@ mod tests {
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 5);

// Read the dataframe from 'output4/'
// Read the dataframe from 'output4/' — an empty folder. Inference now
// errors on an empty location instead of producing a 0-column table.
std::fs::create_dir(&path4)?;
let read_df = ctx
let err = ctx
.read_parquet(
&path4,
ParquetReadOptions {
..Default::default()
},
)
.await?;

let results = read_df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 0);
.await
.expect_err("read_parquet on an empty folder should error");
assert!(
err.strip_backtrace().contains("No files found at"),
"unexpected error: {err}"
);

// Read the dataframe from double dot folder;
let read_df = ctx
Expand Down Expand Up @@ -510,17 +510,18 @@ mod tests {
let ctx = SessionContext::new();
let test_path = "/foo/";

let actual = ctx
// Reading from a non-existent / empty location now errors at planning
// time rather than producing a 0-column table that surfaces a confusing
// "column not found" error at query time.
let err = ctx
.read_parquet(test_path, ParquetReadOptions::default())
.await?
.collect()
.await?;

#[cfg_attr(any(), rustfmt::skip)]
assert_batches_eq!(&[
"++",
"++",
], &actual);
.await
.expect_err("read_parquet on an empty location should error");
let msg = err.strip_backtrace();
assert!(
msg.contains("No files found at") && msg.contains(test_path),
"unexpected error: {msg}"
);

Ok(())
}
Expand Down
41 changes: 41 additions & 0 deletions datafusion/sqllogictest/test_files/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,47 @@ c1 Null 0
c2 Null 1
c3 Null 2

# Creating an external table over a location with no files and without an
# explicit schema should error rather than producing a 0-column table that
# fails with a confusing "column not found" error at query time.
statement error DataFusion error: Error during planning: No files found at .*\. Cannot infer schema from an empty location; either add data files or declare an explicit schema for the table\.
CREATE EXTERNAL TABLE empty_dir_parquet STORED AS PARQUET LOCATION 'test_files/scratch/ddl/empty_dir/';

statement error DataFusion error: Error during planning: No files found at .*\. Cannot infer schema from an empty location; either add data files or declare an explicit schema for the table\.
CREATE EXTERNAL TABLE empty_dir_csv STORED AS CSV LOCATION 'test_files/scratch/ddl/empty_dir/' OPTIONS ('format.has_header' 'true');

statement error DataFusion error: Error during planning: No files found at .*\. Cannot infer schema from an empty location; either add data files or declare an explicit schema for the table\.
CREATE EXTERNAL TABLE empty_dir_json STORED AS JSON LOCATION 'test_files/scratch/ddl/empty_dir/';

# Providing an explicit schema for the same empty location is still allowed,
# so users can pre-declare a table to be populated later via INSERT.
statement ok
CREATE EXTERNAL TABLE empty_dir_with_schema(x int) STORED AS PARQUET LOCATION 'test_files/scratch/ddl/empty_dir/';

query I
select * from empty_dir_with_schema;
----

statement ok
drop table empty_dir_with_schema;

# Once a file is written to the directory, schema inference works as before.
statement ok
COPY (values (1), (2), (3)) TO 'test_files/scratch/ddl/empty_dir_filled/' STORED AS PARQUET;

statement ok
CREATE EXTERNAL TABLE filled_dir_inferred STORED AS PARQUET LOCATION 'test_files/scratch/ddl/empty_dir_filled/';

query I rowsort
select * from filled_dir_inferred;
----
1
2
3

statement ok
drop table filled_dir_inferred;


## should allow any type of exprs as values
statement ok
Expand Down
Loading