diff --git a/crates/polars-arrow/src/datatypes/schema.rs b/crates/polars-arrow/src/datatypes/schema.rs index f4894efa5574..9b01816c1135 100644 --- a/crates/polars-arrow/src/datatypes/schema.rs +++ b/crates/polars-arrow/src/datatypes/schema.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use polars_error::{polars_bail, PolarsResult}; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -62,6 +63,24 @@ impl ArrowSchema { metadata: self.metadata, } } + + pub fn try_project(&self, indices: &[usize]) -> PolarsResult { + let fields = indices.iter().map(|&i| { + let Some(out) = self.fields.get(i) else { + polars_bail!( + SchemaFieldNotFound: "projection index {} is out of bounds for schema of length {}", + i, self.fields.len() + ); + }; + + Ok(out.clone()) + }).collect::>>()?; + + Ok(ArrowSchema { + fields, + metadata: self.metadata.clone(), + }) + } } impl From> for ArrowSchema { diff --git a/crates/polars-core/src/schema.rs b/crates/polars-core/src/schema.rs index 9308cac5255a..f66bc6059fbc 100644 --- a/crates/polars-core/src/schema.rs +++ b/crates/polars-core/src/schema.rs @@ -446,6 +446,9 @@ pub trait IndexOfSchema: Debug { /// Get a vector of all column names. fn get_names(&self) -> Vec<&str>; + /// Get a vector of (name, dtype) pairs + fn get_names_and_dtypes(&'_ self) -> Vec<(&'_ str, DataType)>; + fn try_index_of(&self, name: &str) -> PolarsResult { self.index_of(name).ok_or_else(|| { polars_err!( @@ -464,6 +467,13 @@ impl IndexOfSchema for Schema { fn get_names(&self) -> Vec<&str> { self.iter_names().map(|name| name.as_str()).collect() } + + fn get_names_and_dtypes(&'_ self) -> Vec<(&'_ str, DataType)> { + self.inner + .iter() + .map(|(name, dtype)| (name.as_str(), dtype.clone())) + .collect() + } } impl IndexOfSchema for ArrowSchema { @@ -474,6 +484,45 @@ impl IndexOfSchema for ArrowSchema { fn get_names(&self) -> Vec<&str> { self.fields.iter().map(|f| f.name.as_str()).collect() } + + fn get_names_and_dtypes(&'_ self) -> Vec<(&'_ str, DataType)> { + self.fields + .iter() + .map(|x| (x.name.as_str(), DataType::from_arrow(&x.data_type, true))) + .collect() + } +} + +pub trait SchemaNamesAndDtypes { + const IS_ARROW: bool; + type DataType: Debug + PartialEq; + + /// Get a vector of (name, dtype) pairs + fn get_names_and_dtypes(&'_ self) -> Vec<(&'_ str, Self::DataType)>; +} + +impl SchemaNamesAndDtypes for Schema { + const IS_ARROW: bool = false; + type DataType = DataType; + + fn get_names_and_dtypes(&'_ self) -> Vec<(&'_ str, Self::DataType)> { + self.inner + .iter() + .map(|(name, dtype)| (name.as_str(), dtype.clone())) + .collect() + } +} + +impl SchemaNamesAndDtypes for ArrowSchema { + const IS_ARROW: bool = true; + type DataType = ArrowDataType; + + fn get_names_and_dtypes(&'_ self) -> Vec<(&'_ str, Self::DataType)> { + self.fields + .iter() + .map(|x| (x.name.as_str(), x.data_type.clone())) + .collect() + } } impl From<&ArrowSchema> for Schema { @@ -498,3 +547,51 @@ impl From<&ArrowSchemaRef> for Schema { Self::from(value.as_ref()) } } + +pub fn ensure_matching_schema(lhs: &S, rhs: &S) -> PolarsResult<()> { + let lhs = lhs.get_names_and_dtypes(); + let rhs = rhs.get_names_and_dtypes(); + + if lhs.len() != rhs.len() { + polars_bail!( + SchemaMismatch: + "schemas contained differing number of columns: {} != {}", + lhs.len(), rhs.len(), + ); + } + + for (i, ((l_name, l_dtype), (r_name, r_dtype))) in lhs.iter().zip(&rhs).enumerate() { + if l_name != r_name { + polars_bail!( + SchemaMismatch: + "schema names differ at index {}: {} != {}", + i, l_name, r_name + ) + } + if l_dtype != r_dtype + && (!S::IS_ARROW + || unsafe { + // For timezone normalization. Easier than writing out the entire PartialEq. + DataType::from_arrow( + std::mem::transmute::<&::DataType, &ArrowDataType>( + l_dtype, + ), + true, + ) != DataType::from_arrow( + std::mem::transmute::<&::DataType, &ArrowDataType>( + r_dtype, + ), + true, + ) + }) + { + polars_bail!( + SchemaMismatch: + "schema dtypes differ at index {} for column {}: {:?} != {:?}", + i, l_name, l_dtype, r_dtype + ) + } + } + + Ok(()) +} diff --git a/crates/polars-io/src/parquet/read/reader.rs b/crates/polars-io/src/parquet/read/reader.rs index 97d96634be54..166be6d4704a 100644 --- a/crates/polars-io/src/parquet/read/reader.rs +++ b/crates/polars-io/src/parquet/read/reader.rs @@ -80,22 +80,38 @@ impl ParquetReader { self } - /// Set the [`Schema`] if already known. This must be exactly the same as - /// the schema in the file itself. - pub fn with_schema(mut self, schema: Option) -> Self { - self.schema = schema; - self + /// Ensure the schema of the file matches the given schema. Calling this + /// after setting the projection will ensure only the projected indices + /// are checked. + pub fn check_schema(mut self, schema: &ArrowSchema) -> PolarsResult { + let self_schema = self.schema()?; + let self_schema = self_schema.as_ref(); + + if let Some(ref projection) = self.projection { + let projection = projection.as_slice(); + + ensure_matching_schema( + &schema.try_project(projection)?, + &self_schema.try_project(projection)?, + )?; + } else { + ensure_matching_schema(schema, self_schema)?; + } + + Ok(self) } /// [`Schema`] of the file. pub fn schema(&mut self) -> PolarsResult { - match &self.schema { - Some(schema) => Ok(schema.clone()), + self.schema = Some(match &self.schema { + Some(schema) => schema.clone(), None => { let metadata = self.get_metadata()?; - Ok(Arc::new(read::infer_schema(metadata)?)) + Arc::new(read::infer_schema(metadata)?) }, - } + }); + + Ok(self.schema.clone().unwrap()) } /// Use statistics in the parquet to determine if pages @@ -226,7 +242,6 @@ impl ParquetAsyncReader { pub async fn from_uri( uri: &str, cloud_options: Option<&CloudOptions>, - schema: Option, metadata: Option, ) -> PolarsResult { Ok(ParquetAsyncReader { @@ -238,20 +253,40 @@ impl ParquetAsyncReader { predicate: None, use_statistics: true, hive_partition_columns: None, - schema, + schema: None, parallel: Default::default(), }) } + pub async fn check_schema(mut self, schema: &ArrowSchema) -> PolarsResult { + let self_schema = self.schema().await?; + let self_schema = self_schema.as_ref(); + + if let Some(ref projection) = self.projection { + let projection = projection.as_slice(); + + ensure_matching_schema( + &schema.try_project(projection)?, + &self_schema.try_project(projection)?, + )?; + } else { + ensure_matching_schema(schema, self_schema)?; + } + + Ok(self) + } + pub async fn schema(&mut self) -> PolarsResult { - Ok(match self.schema.as_ref() { + self.schema = Some(match self.schema.as_ref() { Some(schema) => Arc::clone(schema), None => { let metadata = self.reader.get_metadata().await?; let arrow_schema = polars_parquet::arrow::read::infer_schema(metadata)?; Arc::new(arrow_schema) }, - }) + }); + + Ok(self.schema.clone().unwrap()) } pub async fn num_rows(&mut self) -> PolarsResult { diff --git a/crates/polars-io/src/utils.rs b/crates/polars-io/src/utils.rs index af5753fb4b4b..d70cc5d2f02f 100644 --- a/crates/polars-io/src/utils.rs +++ b/crates/polars-io/src/utils.rs @@ -270,53 +270,6 @@ pub fn materialize_projection( } } -pub fn check_projected_schema_impl( - a: &Schema, - b: &Schema, - projected_names: Option<&[String]>, - msg: &str, -) -> PolarsResult<()> { - if !projected_names - .map(|projected_names| { - projected_names - .iter() - .all(|name| a.get(name) == b.get(name)) - }) - .unwrap_or_else(|| a == b) - { - polars_bail!(ComputeError: "{msg}\n\n\ - Expected: {:?}\n\n\ - Got: {:?}", a, b) - } - Ok(()) -} - -/// Checks if the projected columns are equal -pub fn check_projected_arrow_schema( - a: &ArrowSchema, - b: &ArrowSchema, - projected_names: Option<&[String]>, - msg: &str, -) -> PolarsResult<()> { - if a != b { - let a = Schema::from(a); - let b = Schema::from(b); - check_projected_schema_impl(&a, &b, projected_names, msg) - } else { - Ok(()) - } -} - -/// Checks if the projected columns are equal -pub fn check_projected_schema( - a: &Schema, - b: &Schema, - projected_names: Option<&[String]>, - msg: &str, -) -> PolarsResult<()> { - check_projected_schema_impl(a, b, projected_names, msg) -} - /// Split DataFrame into chunks in preparation for writing. The chunks have a /// maximum number of rows per chunk to ensure reasonable memory efficiency when /// reading the resulting file, and a minimum size per chunk to ensure diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 89acc3de4fc7..ba3189ba2e0d 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -91,12 +91,6 @@ impl ParquetExec { ); let mut reader = ParquetReader::new(file) - .with_schema( - self.file_info - .reader_schema - .clone() - .map(|either| either.unwrap_left()), - ) .read_parallel(parallel) .set_low_memory(self.options.low_memory) .use_statistics(self.options.use_statistics) @@ -141,6 +135,14 @@ impl ParquetExec { .with_row_index(row_index) .with_predicate(predicate.clone()) .with_projection(projection.clone()) + .check_schema( + self.file_info + .reader_schema + .clone() + .unwrap() + .unwrap_left() + .as_ref(), + )? .finish() }, ) @@ -164,16 +166,8 @@ impl ParquetExec { #[cfg(feature = "cloud")] async fn read_async(&mut self) -> PolarsResult> { let verbose = verbose(); - let first_schema = self - .file_info - .reader_schema - .as_ref() - .expect("should be set") - .as_ref() - .unwrap_left(); let first_metadata = &self.metadata; let cloud_options = self.cloud_options.as_ref(); - let with_columns = self.file_options.with_columns.as_ref().map(|v| v.as_ref()); let mut result = vec![]; let batch_size = get_file_prefetch_size(); @@ -207,29 +201,14 @@ impl ParquetExec { let iter = paths.iter().enumerate().map(|(i, path)| async move { let first_file = batch_start == 0 && i == 0; // use the cached one as this saves a cloud call - let (metadata, schema) = if first_file { - (first_metadata.clone(), Some((*first_schema).clone())) + let metadata = if first_file { + first_metadata.clone() } else { - (None, None) + None }; - let mut reader = ParquetAsyncReader::from_uri( - &path.to_string_lossy(), - cloud_options, - // Schema must be the same for all files. The hive partitions are included in this schema. - schema, - metadata, - ) - .await?; - - if !first_file { - let schema = reader.schema().await?; - check_projected_arrow_schema( - first_schema.as_ref(), - schema.as_ref(), - with_columns, - "schema of all files in a single scan_parquet must be equal", - )? - } + let mut reader = + ParquetAsyncReader::from_uri(&path.to_string_lossy(), cloud_options, metadata) + .await?; let num_rows = reader.num_rows().await?; PolarsResult::Ok((num_rows, reader)) @@ -263,6 +242,15 @@ impl ParquetExec { .as_ref() .map(|x| x[i].materialize_partition_columns()); + let schema = self + .file_info + .reader_schema + .as_ref() + .unwrap() + .as_ref() + .unwrap_left() + .clone(); + async move { let file_info = file_info.clone(); let remaining_rows_to_read = *remaining_rows_to_read; @@ -289,6 +277,8 @@ impl ParquetExec { .with_n_rows(remaining_rows_to_read) .with_row_index(row_index) .with_projection(projection) + .check_schema(schema.as_ref()) + .await? .use_statistics(use_statistics) .with_predicate(predicate) .set_rechunk(false) diff --git a/crates/polars-pipe/src/executors/sources/csv.rs b/crates/polars-pipe/src/executors/sources/csv.rs index 218915a76359..a63692138484 100644 --- a/crates/polars-pipe/src/executors/sources/csv.rs +++ b/crates/polars-pipe/src/executors/sources/csv.rs @@ -27,9 +27,7 @@ pub(crate) struct CsvSource { // state for multi-file reads current_path_idx: usize, n_rows_read: usize, - // Used to check schema in a way that throws the same error messages as the default engine. - // TODO: Refactor the checking code so that we can just use the schema to do this. - schema_check_df: DataFrame, + first_schema: Schema, } impl CsvSource { @@ -145,7 +143,7 @@ impl CsvSource { verbose, current_path_idx: 0, n_rows_read: 0, - schema_check_df: Default::default(), + first_schema: Default::default(), }) } } @@ -175,11 +173,10 @@ impl Source for CsvSource { }; if first_read_from_file { - let first_df = batches.first().unwrap(); - if self.schema_check_df.width() == 0 { - self.schema_check_df = first_df.clear(); + if self.first_schema.is_empty() { + self.first_schema = batches[0].schema(); } - self.schema_check_df.vstack(first_df)?; + ensure_matching_schema(&self.first_schema, &batches[0].schema())?; } let index = get_source_index(0); diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index 781a38749981..1e798cc194ca 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -3,7 +3,6 @@ use std::ops::Range; use std::path::PathBuf; use std::sync::Arc; -use arrow::datatypes::ArrowSchemaRef; use polars_core::config::{self, get_file_prefetch_size}; use polars_core::error::*; use polars_core::prelude::Series; @@ -16,7 +15,7 @@ use polars_io::predicates::PhysicalIoExpr; use polars_io::prelude::materialize_projection; #[cfg(feature = "async")] use polars_io::prelude::ParquetAsyncReader; -use polars_io::utils::{check_projected_arrow_schema, is_cloud_url}; +use polars_io::utils::is_cloud_url; use polars_io::SerReader; use polars_plan::plans::FileInfo; use polars_plan::prelude::hive::HivePartitions; @@ -72,7 +71,6 @@ impl ParquetSource { FileScanOptions, Option>, usize, - Option, Option>, )> { let path = &self.paths[index]; @@ -99,18 +97,12 @@ impl ParquetSource { eprintln!("STREAMING CHUNK SIZE: {chunk_size} rows") } - let reader_schema = if self.processed_paths == 0 { - self.file_info.reader_schema.clone() - } else { - None - }; Ok(( path, options, file_options, projection, chunk_size, - reader_schema.map(|either| either.unwrap_left()), hive_partitions, )) } @@ -120,17 +112,24 @@ impl ParquetSource { return Ok(()); }; let predicate = self.predicate.clone(); - let (path, options, file_options, projection, chunk_size, reader_schema, hive_partitions) = + let (path, options, file_options, projection, chunk_size, hive_partitions) = self.prepare_init_reader(index)?; let batched_reader = { let file = std::fs::File::open(path).unwrap(); ParquetReader::new(file) - .with_schema(reader_schema) + .with_projection(projection) + .check_schema( + self.file_info + .reader_schema + .as_ref() + .unwrap() + .as_ref() + .unwrap_left(), + )? .with_n_rows(file_options.n_rows) .with_row_index(file_options.row_index) .with_predicate(predicate.clone()) - .with_projection(projection) .use_statistics(options.use_statistics) .with_hive_partition_columns(hive_partitions) .batched(chunk_size)? @@ -140,20 +139,6 @@ impl ParquetSource { } fn finish_init_reader(&mut self, batched_reader: BatchedParquetReader) -> PolarsResult<()> { - if self.processed_paths >= 1 { - let with_columns = self.file_options.with_columns.as_ref().map(|v| v.as_ref()); - check_projected_arrow_schema( - batched_reader.schema().as_ref(), - self.file_info - .reader_schema - .as_ref() - .unwrap() - .as_ref() - .unwrap_left(), - with_columns, - "schema of all files in a single scan_parquet must be equal", - )?; - } self.batched_readers.push_back(batched_reader); self.processed_paths += 1; Ok(()) @@ -164,16 +149,25 @@ impl ParquetSource { let metadata = self.metadata.clone(); let predicate = self.predicate.clone(); let cloud_options = self.cloud_options.clone(); - let (path, options, file_options, projection, chunk_size, reader_schema, hive_partitions) = + let (path, options, file_options, projection, chunk_size, hive_partitions) = self.prepare_init_reader(index)?; let batched_reader = { let uri = path.to_string_lossy(); - ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), reader_schema, metadata) + ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), metadata) .await? .with_n_rows(file_options.n_rows) .with_row_index(file_options.row_index) .with_projection(projection) + .check_schema( + self.file_info + .reader_schema + .as_ref() + .unwrap() + .as_ref() + .unwrap_left(), + ) + .await? .with_predicate(predicate.clone()) .use_statistics(options.use_statistics) .with_hive_partition_columns(hive_partitions) diff --git a/crates/polars-plan/src/plans/conversion/scans.rs b/crates/polars-plan/src/plans/conversion/scans.rs index 2292da0ca889..68e17d278898 100644 --- a/crates/polars-plan/src/plans/conversion/scans.rs +++ b/crates/polars-plan/src/plans/conversion/scans.rs @@ -52,8 +52,7 @@ pub(super) fn parquet_file_info( { let uri = path.to_string_lossy(); get_runtime().block_on(async { - let mut reader = - ParquetAsyncReader::from_uri(&uri, cloud_options, None, None).await?; + let mut reader = ParquetAsyncReader::from_uri(&uri, cloud_options, None).await?; let reader_schema = reader.schema().await?; let num_rows = reader.num_rows().await?; let metadata = reader.get_metadata().await?.clone(); diff --git a/crates/polars-plan/src/plans/functions/count.rs b/crates/polars-plan/src/plans/functions/count.rs index aa067f024374..722abcf30c95 100644 --- a/crates/polars-plan/src/plans/functions/count.rs +++ b/crates/polars-plan/src/plans/functions/count.rs @@ -122,8 +122,7 @@ async fn count_rows_cloud_parquet( let collection = paths.iter().map(|path| { with_concurrency_budget(1, || async { let mut reader = - ParquetAsyncReader::from_uri(&path.to_string_lossy(), cloud_options, None, None) - .await?; + ParquetAsyncReader::from_uri(&path.to_string_lossy(), cloud_options, None).await?; reader.num_rows().await }) }); diff --git a/py-polars/tests/unit/io/test_lazy_parquet.py b/py-polars/tests/unit/io/test_lazy_parquet.py index b9ba7d258515..6c4b2842ee13 100644 --- a/py-polars/tests/unit/io/test_lazy_parquet.py +++ b/py-polars/tests/unit/io/test_lazy_parquet.py @@ -392,7 +392,8 @@ def test_io_struct_async_12500(tmp_path: Path) -> None: @pytest.mark.write_disk() -def test_parquet_different_schema(tmp_path: Path) -> None: +@pytest.mark.parametrize("streaming", [True, False]) +def test_parquet_different_schema(tmp_path: Path, streaming: bool) -> None: # Schema is different but the projected columns are same dtype. f1 = tmp_path / "a.parquet" f2 = tmp_path / "b.parquet" @@ -402,7 +403,9 @@ def test_parquet_different_schema(tmp_path: Path) -> None: a.write_parquet(f1) b.write_parquet(f2) - assert pl.scan_parquet([f1, f2]).select("b").collect().columns == ["b"] + assert pl.scan_parquet([f1, f2]).select("b").collect( + streaming=streaming + ).columns == ["b"] @pytest.mark.write_disk() @@ -437,3 +440,13 @@ def scan_collect() -> None: t.join(5) assert results[0].equals(df) + + +@pytest.mark.write_disk() +@pytest.mark.parametrize("streaming", [True, False]) +def test_parquet_schema_mismatch_panic_17067(tmp_path: Path, streaming: bool) -> None: + pl.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}).write_parquet(tmp_path / "1.parquet") + pl.DataFrame({"c": [1, 2, 3], "d": [4, 5, 6]}).write_parquet(tmp_path / "2.parquet") + + with pytest.raises(pl.exceptions.SchemaError): + pl.scan_parquet(tmp_path).collect(streaming=streaming)