From d77794b232f50a502fee3d130f1c84258e2f5fb9 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sun, 29 Oct 2023 17:05:34 +0100 Subject: [PATCH 01/16] feat: read files based on the file extention --- datafusion/core/src/execution/context/mod.rs | 22 ++++++++ .../core/src/execution/context/parquet.rs | 51 +++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index d523c39ee01e..3c2e8592cbb7 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -849,6 +849,28 @@ impl SessionContext { let table_paths = table_paths.to_urls()?; let session_config = self.copied_config(); let listing_options = options.to_listing_options(&session_config); + + let option_extention = listing_options.file_extension.clone(); + let filename = table_paths[0].prefix().filename(); + let extention = if let Some(filename) = filename { + let parts: Vec<&str> = filename.split('.').collect(); + + if parts.len() > 1 { + parts[1..].join(".") + } else { + "".to_owned() + } + } else { + "".to_owned() + }; + + if option_extention != extention && !extention.is_empty() { + return Err(DataFusionError::Execution(format!( + "File extension '{}' does not match the expected extension '{}'", + extention, option_extention + ))); + } + let resolved_schema = options .get_resolved_schema(&session_config, self.state(), table_paths[0].clone()) .await?; diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index dc202b9903f5..39332312c2c9 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -74,6 +74,11 @@ impl SessionContext { mod tests { use async_trait::async_trait; + use crate::arrow::array::{Float32Array, Int32Array}; + use crate::arrow::datatypes::{DataType, Field, Schema}; + use crate::arrow::record_batch::RecordBatch; + use crate::dataframe::DataFrameWriteOptions; + use crate::parquet::basic::Compression; use crate::test_util::parquet_test_data; use super::*; @@ -132,6 +137,52 @@ mod tests { Ok(()) } + #[tokio::test] + async fn read_from_wrong_file_extentnion() -> Result<()> { + let ctx = SessionContext::new(); + + // Make up a new dataframe. + let write_df = ctx.read_batch(RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("purchase_id", DataType::Int32, false), + Field::new("price", DataType::Float32, false), + Field::new("quantity", DataType::Int32, false), + ])), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 6.66])), + Arc::new(Int32Array::from(vec![1, 3, 2, 4, 3])), + ], + )?)?; + + write_df + .write_parquet( + "output.parquet.snappy", + DataFrameWriteOptions::new().with_single_file_output(true), + Some( + WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(), + ), + ) + .await?; + + let read_df = ctx + .read_parquet( + "output.parquet.snappy", + ParquetReadOptions { + ..Default::default() + }, + ) + .await; + assert_eq!( + read_df.unwrap_err().to_string(), + "Execution error: File extension 'parquet.snappy' does not match the expected extension '.parquet'" + ); + + Ok(()) + } + // Test for compilation error when calling read_* functions from an #[async_trait] function. // See https://github.com/apache/arrow-datafusion/issues/1154 #[async_trait] From 97bad2fa225244f8b61a8d5ea163dd11e451a342 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sun, 29 Oct 2023 17:28:46 +0100 Subject: [PATCH 02/16] fix: some the file extension might be started with . and some not --- datafusion/core/output.parquet.snappy | Bin 0 -> 846 bytes datafusion/core/src/execution/context/mod.rs | 7 ++++++- 2 files changed, 6 insertions(+), 1 deletion(-) create mode 100644 datafusion/core/output.parquet.snappy diff --git a/datafusion/core/output.parquet.snappy b/datafusion/core/output.parquet.snappy new file mode 100644 index 0000000000000000000000000000000000000000..2e2eed2b00d7b89b30e16dcde821f6a554fba478 GIT binary patch literal 846 zcmaJ=F>BjU5I#xIif~MDg75GH5j>=i6jGBy10D>umk^2ulPoO+f^Y@#5ZjUMgsz=J z2z2RC8_%9Pbqe_jow{_(kfA@KW9~_bB-gltlaBAbyZi3@P^)?P8U=3Sj)!@K4S%7iD?1KMLEY{vbH%d_ey3jmdbudAs?$vB*Q8l!d;V;o&^-$o0cc zJ6K?l*3?9rM+_>s<)ITPQLO9jvlGQ+d=XMub7VNC7r`#I(Bm?YE?Q1HXBcX={sp9W_>+6=ycIoEhoMq>Tq(_|f>ymVze_>;zM0%8^=jTXx?4e_pHZV(E@hQa&Ny%{)KqY Date: Sun, 29 Oct 2023 21:47:28 +0100 Subject: [PATCH 03/16] fix: rename extention to extension --- datafusion/core/src/execution/context/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 9facc37f1a6c..d97ec3652aaa 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -852,7 +852,7 @@ impl SessionContext { let option_extention = listing_options.file_extension.clone(); let filename = table_paths[0].prefix().filename(); - let extention = if let Some(filename) = filename { + let extension = if let Some(filename) = filename { let parts: Vec<&str> = filename.split('.').collect(); if parts.len() > 1 { @@ -864,15 +864,15 @@ impl SessionContext { "".to_owned() }; // some the file extension might be started with "." and some not - let extention_alternative = ".".to_string() + extention.as_str(); + let extention_alternative = ".".to_string() + extension.as_str(); - if option_extention != extention + if option_extention != extension && option_extention != extention_alternative - && !extention.is_empty() + && !extension.is_empty() { return Err(DataFusionError::Execution(format!( "File extension '{}' does not match the expected extension '{}'", - extention, option_extention + extension, option_extention ))); } From b53f4a40c196dd9ee3dc455ae72b840692f0ae4d Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sun, 29 Oct 2023 21:49:32 +0100 Subject: [PATCH 04/16] chore: use exec_err --- datafusion/core/src/execution/context/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index d97ec3652aaa..3f27c4ad91a3 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -870,10 +870,11 @@ impl SessionContext { && option_extention != extention_alternative && !extension.is_empty() { - return Err(DataFusionError::Execution(format!( + return exec_err!( "File extension '{}' does not match the expected extension '{}'", - extension, option_extention - ))); + extension, + option_extention + ); } let resolved_schema = options From 47b5a9f43258de75d5dec2d434016e8fe2e777b4 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sun, 29 Oct 2023 22:00:31 +0100 Subject: [PATCH 05/16] chore: rename extention to extension --- datafusion/core/src/execution/context/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 3f27c4ad91a3..769af3861290 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -864,10 +864,10 @@ impl SessionContext { "".to_owned() }; // some the file extension might be started with "." and some not - let extention_alternative = ".".to_string() + extension.as_str(); + let extension_alternative = ".".to_string() + extension.as_str(); if option_extention != extension - && option_extention != extention_alternative + && option_extention != extension_alternative && !extension.is_empty() { return exec_err!( From a801398ddb59f4f5a9cd7245416bfa4674fe9031 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sun, 29 Oct 2023 22:00:50 +0100 Subject: [PATCH 06/16] chore: rename extention to extension --- datafusion/core/src/execution/context/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 769af3861290..b5bfe8800962 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -850,7 +850,7 @@ impl SessionContext { let session_config = self.copied_config(); let listing_options = options.to_listing_options(&session_config); - let option_extention = listing_options.file_extension.clone(); + let option_extension = listing_options.file_extension.clone(); let filename = table_paths[0].prefix().filename(); let extension = if let Some(filename) = filename { let parts: Vec<&str> = filename.split('.').collect(); @@ -866,14 +866,14 @@ impl SessionContext { // some the file extension might be started with "." and some not let extension_alternative = ".".to_string() + extension.as_str(); - if option_extention != extension - && option_extention != extension_alternative + if option_extension != extension + && option_extension != extension_alternative && !extension.is_empty() { return exec_err!( "File extension '{}' does not match the expected extension '{}'", extension, - option_extention + option_extension ); } From 9f68f3594af38bd9dae97cd7d3666b7c285e70c5 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sun, 29 Oct 2023 22:09:12 +0100 Subject: [PATCH 07/16] chore: simplify the code --- datafusion/core/src/execution/context/mod.rs | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b5bfe8800962..29153eaa1df2 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -851,18 +851,11 @@ impl SessionContext { let listing_options = options.to_listing_options(&session_config); let option_extension = listing_options.file_extension.clone(); - let filename = table_paths[0].prefix().filename(); - let extension = if let Some(filename) = filename { - let parts: Vec<&str> = filename.split('.').collect(); - - if parts.len() > 1 { - parts[1..].join(".") - } else { - "".to_owned() - } - } else { - "".to_owned() - }; + let extension = table_paths[0] + .prefix() + .filename() + .map(|filename| filename.split('.').skip(1).collect::>().join(".")) + .unwrap_or("".to_owned()); // some the file extension might be started with "." and some not let extension_alternative = ".".to_string() + extension.as_str(); From 36c7f545c819d9338c3fa79e8a36b4274b2b83a5 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sun, 29 Oct 2023 22:15:51 +0100 Subject: [PATCH 08/16] fix: check table is empty --- datafusion/core/src/execution/context/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 29153eaa1df2..0521ad6a8413 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -851,6 +851,11 @@ impl SessionContext { let listing_options = options.to_listing_options(&session_config); let option_extension = listing_options.file_extension.clone(); + + if table_paths.is_empty() { + return exec_err!("No table paths were provided"); + } + let extension = table_paths[0] .prefix() .filename() From 4ff028ba51b02a93201347b2b82729f73f42fa75 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sun, 29 Oct 2023 22:42:50 +0100 Subject: [PATCH 09/16] ci: fix test --- datafusion/core/output.parquet.snappy | Bin 846 -> 0 bytes datafusion/core/src/execution/context/parquet.rs | 5 +---- 2 files changed, 1 insertion(+), 4 deletions(-) delete mode 100644 datafusion/core/output.parquet.snappy diff --git a/datafusion/core/output.parquet.snappy b/datafusion/core/output.parquet.snappy deleted file mode 100644 index 2e2eed2b00d7b89b30e16dcde821f6a554fba478..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 846 zcmaJ=F>BjU5I#xIif~MDg75GH5j>=i6jGBy10D>umk^2ulPoO+f^Y@#5ZjUMgsz=J z2z2RC8_%9Pbqe_jow{_(kfA@KW9~_bB-gltlaBAbyZi3@P^)?P8U=3Sj)!@K4S%7iD?1KMLEY{vbH%d_ey3jmdbudAs?$vB*Q8l!d;V;o&^-$o0cc zJ6K?l*3?9rM+_>s<)ITPQLO9jvlGQ+d=XMub7VNC7r`#I(Bm?YE?Q1HXBcX={sp9W_>+6=ycIoEhoMq>Tq(_|f>ymVze_>;zM0%8^=jTXx?4e_pHZV(E@hQa&Ny%{)KqY Date: Mon, 30 Oct 2023 18:09:24 +0100 Subject: [PATCH 10/16] fix: add err info --- datafusion/core/src/execution/context/parquet.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 4226152805fe..4abc2b9df7cc 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -175,8 +175,10 @@ mod tests { }, ) .await; - assert!(read_df.is_err()); - + assert_eq!( + read_df.unwrap_err().strip_backtrace(), + "Execution error: File extension 'parquet.snappy' does not match the expected extension '.parquet'" + ); Ok(()) } From f6c0568319a64e97e80d3d968d3b826f950923c5 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Thu, 2 Nov 2023 10:32:16 +0100 Subject: [PATCH 11/16] refactor: extract the logic to infer_types --- datafusion/core/src/execution/context/mod.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 0521ad6a8413..4a386f710af1 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -837,6 +837,16 @@ impl SessionContext { .insert(f.name.clone(), Arc::new(f)); } + /// Heuristically determines the format (e.g. parquet, csv) to use with the `table_paths` + fn infer_types(table_paths: &Vec) -> Option { + let extension = table_paths[0] + .prefix() + .filename() + .map(|filename| filename.split('.').skip(1).collect::>().join(".")) + .unwrap_or("".to_owned()); + Some(extension) + } + /// Creates a [`DataFrame`] for reading a data source. /// /// For more control such as reading multiple files, you can use @@ -856,11 +866,7 @@ impl SessionContext { return exec_err!("No table paths were provided"); } - let extension = table_paths[0] - .prefix() - .filename() - .map(|filename| filename.split('.').skip(1).collect::>().join(".")) - .unwrap_or("".to_owned()); + let extension = Self::infer_types(&table_paths).unwrap(); // some the file extension might be started with "." and some not let extension_alternative = ".".to_string() + extension.as_str(); From d25069839c6da35000f47e00d545e55bf16c10f2 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Thu, 2 Nov 2023 11:03:42 +0100 Subject: [PATCH 12/16] fix: add tests for different extensions --- .../core/src/execution/context/parquet.rs | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 4abc2b9df7cc..5352acb65efa 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -156,8 +156,21 @@ mod tests { )?)?; write_df + .clone() .write_parquet( - "output.parquet.snappy", + "output1.parquet", + DataFrameWriteOptions::new().with_single_file_output(true), + Some( + WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(), + ), + ) + .await?; + + write_df + .write_parquet( + "output2.parquet.snappy", DataFrameWriteOptions::new().with_single_file_output(true), Some( WriterProperties::builder() @@ -169,12 +182,26 @@ mod tests { let read_df = ctx .read_parquet( - "output.parquet.snappy", + "output1.parquet", + ParquetReadOptions { + ..Default::default() + }, + ) + .await?; + + let results = read_df.collect().await?; + let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); + assert_eq!(total_rows, 5); + + let read_df = ctx + .read_parquet( + "output2.parquet.snappy", ParquetReadOptions { ..Default::default() }, ) .await; + assert_eq!( read_df.unwrap_err().strip_backtrace(), "Execution error: File extension 'parquet.snappy' does not match the expected extension '.parquet'" From 7217be4e9cd7489da13e050319342d4713c1c006 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Thu, 2 Nov 2023 11:13:54 +0100 Subject: [PATCH 13/16] fix: ci clippy --- datafusion/core/src/execution/context/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 4a386f710af1..b39e0176ce9b 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -838,7 +838,7 @@ impl SessionContext { } /// Heuristically determines the format (e.g. parquet, csv) to use with the `table_paths` - fn infer_types(table_paths: &Vec) -> Option { + fn infer_types(table_paths: &[ListingTableUrl]) -> Option { let extension = table_paths[0] .prefix() .filename() From c17158f4887f93d59160f8f93a0ba819c196939b Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Thu, 2 Nov 2023 23:16:39 +0100 Subject: [PATCH 14/16] fix: add more tests --- .../core/src/execution/context/parquet.rs | 50 ++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 5352acb65efa..79dd447446ad 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -138,7 +138,7 @@ mod tests { } #[tokio::test] - async fn read_from_wrong_file_extentnion() -> Result<()> { + async fn read_from_wrong_file_extension() -> Result<()> { let ctx = SessionContext::new(); // Make up a new dataframe. @@ -155,6 +155,7 @@ mod tests { ], )?)?; + // Write the dataframe to a parquet file named 'output1.parquet' write_df .clone() .write_parquet( @@ -168,7 +169,9 @@ mod tests { ) .await?; + // Write the dataframe to a parquet file named 'output2.parquet.snappy' write_df + .clone() .write_parquet( "output2.parquet.snappy", DataFrameWriteOptions::new().with_single_file_output(true), @@ -180,6 +183,20 @@ mod tests { ) .await?; + // Write the dataframe to a parquet file named 'output3.parquet.snappy.parquet' + write_df + .write_parquet( + "output3.parquet.snappy.parquet", + DataFrameWriteOptions::new().with_single_file_output(true), + Some( + WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(), + ), + ) + .await?; + + // Read the dataframe from 'output1.parquet' with the default file extension. let read_df = ctx .read_parquet( "output1.parquet", @@ -193,6 +210,21 @@ mod tests { let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); assert_eq!(total_rows, 5); + // Read the dataframe from 'output2.parquet.snappy' with the correct file extension. + let read_df = ctx + .read_parquet( + "output2.parquet.snappy", + ParquetReadOptions { + file_extension: "snappy", + ..Default::default() + }, + ) + .await?; + let results = read_df.collect().await?; + let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); + assert_eq!(total_rows, 5); + + // Read the dataframe from 'output3.parquet.snappy.parquet' with the wrong file extension. let read_df = ctx .read_parquet( "output2.parquet.snappy", @@ -204,8 +236,22 @@ mod tests { assert_eq!( read_df.unwrap_err().strip_backtrace(), - "Execution error: File extension 'parquet.snappy' does not match the expected extension '.parquet'" + "Execution error: File 'output2.parquet.snappy' does not match the expected extension '.parquet'" ); + + // Read the dataframe from 'output3.parquet.snappy.parquet' with the correct file extension. + let read_df = ctx + .read_parquet( + "output3.parquet.snappy.parquet", + ParquetReadOptions { + ..Default::default() + }, + ) + .await?; + + let results = read_df.collect().await?; + let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); + assert_eq!(total_rows, 5); Ok(()) } From 5f751b76d8b4a4685d4de46b89e3bd3b53cbe4e9 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Thu, 2 Nov 2023 23:16:58 +0100 Subject: [PATCH 15/16] fix: simplify the logic --- datafusion/core/src/execution/context/mod.rs | 31 +++++--------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b39e0176ce9b..1f1ab446b626 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -837,16 +837,6 @@ impl SessionContext { .insert(f.name.clone(), Arc::new(f)); } - /// Heuristically determines the format (e.g. parquet, csv) to use with the `table_paths` - fn infer_types(table_paths: &[ListingTableUrl]) -> Option { - let extension = table_paths[0] - .prefix() - .filename() - .map(|filename| filename.split('.').skip(1).collect::>().join(".")) - .unwrap_or("".to_owned()); - Some(extension) - } - /// Creates a [`DataFrame`] for reading a data source. /// /// For more control such as reading multiple files, you can use @@ -866,19 +856,14 @@ impl SessionContext { return exec_err!("No table paths were provided"); } - let extension = Self::infer_types(&table_paths).unwrap(); - // some the file extension might be started with "." and some not - let extension_alternative = ".".to_string() + extension.as_str(); - - if option_extension != extension - && option_extension != extension_alternative - && !extension.is_empty() - { - return exec_err!( - "File extension '{}' does not match the expected extension '{}'", - extension, - option_extension - ); + // check if the file extension matches the expected extension + for path in &table_paths { + if !path.as_str().ends_with(&option_extension) { + let file_name = path.prefix().filename().unwrap_or_default(); + return exec_err!( + "File '{file_name}' does not match the expected extension '{option_extension}'" + ); + } } let resolved_schema = options From 744006f9f508601a0e5ed4f8f866d5c28c9daa24 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Thu, 2 Nov 2023 23:50:30 +0100 Subject: [PATCH 16/16] fix: ci --- datafusion/core/src/execution/context/mod.rs | 4 ++-- datafusion/core/src/execution/context/parquet.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 1f1ab446b626..9c500ec07293 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -858,8 +858,8 @@ impl SessionContext { // check if the file extension matches the expected extension for path in &table_paths { - if !path.as_str().ends_with(&option_extension) { - let file_name = path.prefix().filename().unwrap_or_default(); + let file_name = path.prefix().filename().unwrap_or_default(); + if !path.as_str().ends_with(&option_extension) && file_name.contains('.') { return exec_err!( "File '{file_name}' does not match the expected extension '{option_extension}'" ); diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 79dd447446ad..ef1f0143543d 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -138,7 +138,7 @@ mod tests { } #[tokio::test] - async fn read_from_wrong_file_extension() -> Result<()> { + async fn read_from_different_file_extension() -> Result<()> { let ctx = SessionContext::new(); // Make up a new dataframe.