diff --git a/Cargo.lock b/Cargo.lock index f56467351573..dff964bd90f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3086,6 +3086,7 @@ dependencies = [ "polars-compute", "polars-error", "polars-row", + "polars-schema", "polars-utils", "rand", "rand_distr", @@ -3290,6 +3291,7 @@ dependencies = [ "polars-core", "polars-error", "polars-json", + "polars-schema", "polars-utils", "rand", "rand_distr", @@ -3435,6 +3437,17 @@ dependencies = [ "polars-utils", ] +[[package]] +name = "polars-schema" +version = "0.42.0" +dependencies = [ + "indexmap", + "polars-error", + "polars-utils", + "serde", + "version_check", +] + [[package]] name = "polars-sql" version = "0.42.0" diff --git a/Cargo.toml b/Cargo.toml index bf6bec4b67a3..a83488882fb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,7 +53,7 @@ flate2 = { version = "1", default-features = false } futures = "0.3.25" hashbrown = { version = "0.14", features = ["rayon", "ahash", "serde"] } hex = "0.4.3" -indexmap = { version = "2", features = ["std"] } +indexmap = { version = "2", features = ["std", "serde"] } itoa = "1.0.6" itoap = { version = "1", features = ["simd"] } libc = "0.2" @@ -112,6 +112,7 @@ polars-pipe = { version = "0.42.0", path = "crates/polars-pipe", default-feature polars-plan = { version = "0.42.0", path = "crates/polars-plan", default-features = false } polars-python = { version = "0.42.0", path = "crates/polars-python", default-features = false } polars-row = { version = "0.42.0", path = "crates/polars-row", default-features = false } +polars-schema = { version = "0.42.0", path = "crates/polars-schema", default-features = false } polars-sql = { version = "0.42.0", path = "crates/polars-sql", default-features = false } polars-stream = { version = "0.42.0", path = "crates/polars-stream", default-features = false } polars-time = { version = "0.42.0", path = "crates/polars-time", default-features = false } diff --git a/crates/polars-core/Cargo.toml b/crates/polars-core/Cargo.toml index 456d63ffae40..8ce74ced160c 100644 --- a/crates/polars-core/Cargo.toml +++ b/crates/polars-core/Cargo.toml @@ -12,6 +12,7 @@ description = "Core of the Polars DataFrame library" polars-compute = { workspace = true } polars-error = { workspace = true } polars-row = { workspace = true } +polars-schema = { workspace = true } polars-utils = { workspace = true } ahash = { workspace = true } @@ -116,7 +117,7 @@ dtype-struct = [] bigidx = ["arrow/bigidx", "polars-utils/bigidx"] python = [] -serde = ["dep:serde", "bitflags/serde"] +serde = ["dep:serde", "bitflags/serde", "polars-schema/serde"] serde-lazy = ["serde", "arrow/serde", "indexmap/serde", "chrono/serde"] docs-selection = [ diff --git a/crates/polars-core/src/datatypes/field.rs b/crates/polars-core/src/datatypes/field.rs index 3f0f800ebdbf..1ed898add812 100644 --- a/crates/polars-core/src/datatypes/field.rs +++ b/crates/polars-core/src/datatypes/field.rs @@ -13,6 +13,12 @@ pub struct Field { pub dtype: DataType, } +impl From for (PlSmallStr, DataType) { + fn from(value: Field) -> Self { + (value.name, value.dtype) + } +} + pub type FieldRef = Arc; impl Field { diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index b99ab52a7d11..2e7b3adfd2f6 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -539,7 +539,10 @@ impl DataFrame { /// # Ok::<(), PolarsError>(()) /// ``` pub fn schema(&self) -> Schema { - self.columns.as_slice().into() + self.columns + .iter() + .map(|x| (x.name().clone(), x.dtype().clone())) + .collect() } /// Get a reference to the [`DataFrame`] columns. diff --git a/crates/polars-core/src/schema.rs b/crates/polars-core/src/schema.rs index 9354d951d05d..0bf6f1bb2045 100644 --- a/crates/polars-core/src/schema.rs +++ b/crates/polars-core/src/schema.rs @@ -1,227 +1,34 @@ -use std::fmt::{Debug, Formatter}; -use std::hash::{Hash, Hasher}; - -use arrow::datatypes::ArrowSchemaRef; -use indexmap::map::MutableKeys; -use indexmap::IndexMap; -use polars_utils::aliases::PlRandomState; -use polars_utils::itertools::Itertools; +use std::fmt::Debug; + use polars_utils::pl_str::PlSmallStr; -#[cfg(feature = "serde-lazy")] -use serde::{Deserialize, Serialize}; use crate::prelude::*; use crate::utils::try_get_supertype; -/// A map from field/column name to the type of that field/column ([`DataType`]) -#[derive(Eq, Clone, Default)] -#[cfg_attr(feature = "serde-lazy", derive(Serialize, Deserialize))] -pub struct Schema { - inner: PlIndexMap, -} - -impl Hash for Schema { - fn hash(&self, state: &mut H) { - self.inner.iter().for_each(|v| v.hash(state)) - } -} - -// Schemas will only compare equal if they have the same fields in the same order. We can't use `self.inner == -// other.inner` because [`IndexMap`] ignores order when checking equality, but we don't want to ignore it. -impl PartialEq for Schema { - fn eq(&self, other: &Self) -> bool { - self.len() == other.len() && self.iter().zip(other.iter()).all(|(a, b)| a == b) - } -} - -impl Debug for Schema { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - writeln!(f, "Schema:")?; - for (name, dtype) in self.inner.iter() { - writeln!(f, "name: {name}, data type: {dtype:?}")?; - } - Ok(()) - } -} - -impl From<&[Series]> for Schema { - fn from(value: &[Series]) -> Self { - value.iter().map(|s| s.field().into_owned()).collect() - } -} - -impl FromIterator for Schema -where - F: Into, -{ - fn from_iter>(iter: T) -> Self { - let iter = iter.into_iter(); - let mut map: PlIndexMap<_, _> = - IndexMap::with_capacity_and_hasher(iter.size_hint().0, PlRandomState::default()); - for fld in iter { - let fld = fld.into(); - map.insert(fld.name, fld.dtype); - } - Self { inner: map } - } -} - -impl Schema { - /// Create a new, empty schema. - pub fn new() -> Self { - Self::with_capacity(0) - } - - /// Create a new, empty schema with the given capacity. - /// - /// If you know the number of fields you have ahead of time, using this is more efficient than using - /// [`new`][Self::new]. Also consider using [`Schema::from_iter`] if you have the collection of fields available - /// ahead of time. - pub fn with_capacity(capacity: usize) -> Self { - let map: PlIndexMap<_, _> = - IndexMap::with_capacity_and_hasher(capacity, PlRandomState::default()); - Self { inner: map } - } - - /// Reserve `additional` memory spaces in the schema. - pub fn reserve(&mut self, additional: usize) { - self.inner.reserve(additional); - } - - /// The number of fields in the schema. - #[inline] - pub fn len(&self) -> usize { - self.inner.len() - } - - #[inline] - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - /// Rename field `old` to `new`, and return the (owned) old name. - /// - /// If `old` is not present in the schema, the schema is not modified and `None` is returned. Otherwise the schema - /// is updated and `Some(old_name)` is returned. - pub fn rename(&mut self, old: &str, new: PlSmallStr) -> Option { - // Remove `old`, get the corresponding index and dtype, and move the last item in the map to that position - let (old_index, old_name, dtype) = self.inner.swap_remove_full(old)?; - // Insert the same dtype under the new name at the end of the map and store that index - let (new_index, _) = self.inner.insert_full(new, dtype); - // Swap the two indices to move the originally last element back to the end and to move the new element back to - // its original position - self.inner.swap_indices(old_index, new_index); - - Some(old_name) - } - - /// Create a new schema from this one, inserting a field with `name` and `dtype` at the given `index`. - /// - /// If a field named `name` already exists, it is updated with the new dtype. Regardless, the field named `name` is - /// always moved to the given index. Valid indices range from `0` (front of the schema) to `self.len()` (after the - /// end of the schema). - /// - /// For a mutating version that doesn't clone, see [`insert_at_index`][Self::insert_at_index]. - /// - /// Runtime: **O(m * n)** where `m` is the (average) length of the field names and `n` is the number of fields in - /// the schema. This method clones every field in the schema. - /// - /// Returns: `Ok(new_schema)` if `index <= self.len()`, else `Err(PolarsError)` - pub fn new_inserting_at_index( - &self, - index: usize, - name: PlSmallStr, - dtype: DataType, - ) -> PolarsResult { - polars_ensure!( - index <= self.len(), - OutOfBounds: - "index {} is out of bounds for schema with length {} (the max index allowed is self.len())", - index, - self.len() - ); - - let mut new = Self::default(); - let mut iter = self.inner.iter().filter_map(|(fld_name, dtype)| { - (fld_name != &name).then_some((fld_name.clone(), dtype.clone())) - }); - new.inner.extend(iter.by_ref().take(index)); - new.inner.insert(name.clone(), dtype); - new.inner.extend(iter); - Ok(new) - } - - /// Insert a field with `name` and `dtype` at the given `index` into this schema. - /// - /// If a field named `name` already exists, it is updated with the new dtype. Regardless, the field named `name` is - /// always moved to the given index. Valid indices range from `0` (front of the schema) to `self.len()` (after the - /// end of the schema). - /// - /// For a non-mutating version that clones the schema, see [`new_inserting_at_index`][Self::new_inserting_at_index]. - /// - /// Runtime: **O(n)** where `n` is the number of fields in the schema. - /// - /// Returns: - /// - If index is out of bounds, `Err(PolarsError)` - /// - Else if `name` was already in the schema, `Ok(Some(old_dtype))` - /// - Else `Ok(None)` - pub fn insert_at_index( - &mut self, - mut index: usize, - name: PlSmallStr, - dtype: DataType, - ) -> PolarsResult> { - polars_ensure!( - index <= self.len(), - OutOfBounds: - "index {} is out of bounds for schema with length {} (the max index allowed is self.len())", - index, - self.len() - ); +pub type SchemaRef = Arc; +pub type Schema = polars_schema::schema::Schema; - let (old_index, old_dtype) = self.inner.insert_full(name, dtype); +pub trait SchemaExt { + fn from_arrow_schema(value: &ArrowSchema) -> Self; - // If we're moving an existing field, one-past-the-end will actually be out of bounds. Also, self.len() won't - // have changed after inserting, so `index == self.len()` is the same as it was before inserting. - if old_dtype.is_some() && index == self.len() { - index -= 1; - } - self.inner.move_index(old_index, index); - Ok(old_dtype) - } + fn get_field(&self, name: &str) -> Option; - /// Get a reference to the dtype of the field named `name`, or `None` if the field doesn't exist. - pub fn get(&self, name: &str) -> Option<&DataType> { - self.inner.get(name) - } + fn try_get_field(&self, name: &str) -> PolarsResult; - /// Get a reference to the dtype of the field named `name`, or `Err(PolarsErr)` if the field doesn't exist. - pub fn try_get(&self, name: &str) -> PolarsResult<&DataType> { - self.get(name) - .ok_or_else(|| polars_err!(SchemaFieldNotFound: "{}", name)) - } + fn to_arrow(&self, compat_level: CompatLevel) -> ArrowSchema; - /// Get a mutable reference to the dtype of the field named `name`, or `Err(PolarsErr)` if the field doesn't exist. - pub fn try_get_mut(&mut self, name: &str) -> PolarsResult<&mut DataType> { - self.inner - .get_mut(name) - .ok_or_else(|| polars_err!(SchemaFieldNotFound: "{}", name)) - } + fn iter_fields(&self) -> impl ExactSizeIterator + '_; - /// Return all data about the field named `name`: its index in the schema, its name, and its dtype. - /// - /// Returns `Some((index, &name, &dtype))` if the field exists, `None` if it doesn't. - pub fn get_full(&self, name: &str) -> Option<(usize, &PlSmallStr, &DataType)> { - self.inner.get_full(name) - } + fn to_supertype(&mut self, other: &Schema) -> PolarsResult; +} - /// Return all data about the field named `name`: its index in the schema, its name, and its dtype. - /// - /// Returns `Ok((index, &name, &dtype))` if the field exists, `Err(PolarsErr)` if it doesn't. - pub fn try_get_full(&self, name: &str) -> PolarsResult<(usize, &PlSmallStr, &DataType)> { - self.inner - .get_full(name) - .ok_or_else(|| polars_err!(SchemaFieldNotFound: "{}", name)) +impl SchemaExt for Schema { + fn from_arrow_schema(value: &ArrowSchema) -> Self { + value + .fields + .iter() + .map(|x| (x.name.clone(), DataType::from_arrow(&x.data_type, true))) + .collect() } /// Look up the name in the schema and return an owned [`Field`] by cloning the data. @@ -230,9 +37,8 @@ impl Schema { /// /// This method constructs the `Field` by cloning the name and dtype. For a version that returns references, see /// [`get`][Self::get] or [`get_full`][Self::get_full]. - pub fn get_field(&self, name: &str) -> Option { - self.inner - .get_full(name) + fn get_field(&self, name: &str) -> Option { + self.get_full(name) .map(|(_, name, dtype)| Field::new(name.clone(), dtype.clone())) } @@ -242,139 +48,15 @@ impl Schema { /// /// This method constructs the `Field` by cloning the name and dtype. For a version that returns references, see /// [`get`][Self::get] or [`get_full`][Self::get_full]. - pub fn try_get_field(&self, name: &str) -> PolarsResult { - self.inner - .get_full(name) + fn try_get_field(&self, name: &str) -> PolarsResult { + self.get_full(name) .ok_or_else(|| polars_err!(SchemaFieldNotFound: "{}", name)) .map(|(_, name, dtype)| Field::new(name.clone(), dtype.clone())) } - /// Get references to the name and dtype of the field at `index`. - /// - /// If `index` is inbounds, returns `Some((&name, &dtype))`, else `None`. See - /// [`get_at_index_mut`][Self::get_at_index_mut] for a mutable version. - pub fn get_at_index(&self, index: usize) -> Option<(&PlSmallStr, &DataType)> { - self.inner.get_index(index) - } - - pub fn try_get_at_index(&self, index: usize) -> PolarsResult<(&PlSmallStr, &DataType)> { - self.inner.get_index(index).ok_or_else(|| polars_err!(ComputeError: "index {index} out of bounds with 'schema' of len: {}", self.len())) - } - - /// Get mutable references to the name and dtype of the field at `index`. - /// - /// If `index` is inbounds, returns `Some((&mut name, &mut dtype))`, else `None`. See - /// [`get_at_index`][Self::get_at_index] for an immutable version. - pub fn get_at_index_mut(&mut self, index: usize) -> Option<(&mut PlSmallStr, &mut DataType)> { - self.inner.get_index_mut2(index) - } - - /// Swap-remove a field by name and, if the field existed, return its dtype. - /// - /// If the field does not exist, the schema is not modified and `None` is returned. - /// - /// This method does a `swap_remove`, which is O(1) but **changes the order of the schema**: the field named `name` - /// is replaced by the last field, which takes its position. For a slower, but order-preserving, method, use - /// [`shift_remove`][Self::shift_remove]. - pub fn remove(&mut self, name: &str) -> Option { - self.inner.swap_remove(name) - } - - /// Remove a field by name, preserving order, and, if the field existed, return its dtype. - /// - /// If the field does not exist, the schema is not modified and `None` is returned. - /// - /// This method does a `shift_remove`, which preserves the order of the fields in the schema but **is O(n)**. For a - /// faster, but not order-preserving, method, use [`remove`][Self::remove]. - pub fn shift_remove(&mut self, name: &str) -> Option { - self.inner.shift_remove(name) - } - - /// Remove a field by name, preserving order, and, if the field existed, return its dtype. - /// - /// If the field does not exist, the schema is not modified and `None` is returned. - /// - /// This method does a `shift_remove`, which preserves the order of the fields in the schema but **is O(n)**. For a - /// faster, but not order-preserving, method, use [`remove`][Self::remove]. - pub fn shift_remove_index(&mut self, index: usize) -> Option<(PlSmallStr, DataType)> { - self.inner.shift_remove_index(index) - } - - /// Whether the schema contains a field named `name`. - pub fn contains(&self, name: &str) -> bool { - self.get(name).is_some() - } - - /// Change the field named `name` to the given `dtype` and return the previous dtype. - /// - /// If `name` doesn't already exist in the schema, the schema is not modified and `None` is returned. Otherwise - /// returns `Some(old_dtype)`. - /// - /// This method only ever modifies an existing field and never adds a new field to the schema. To add a new field, - /// use [`with_column`][Self::with_column] or [`insert_at_index`][Self::insert_at_index]. - pub fn set_dtype(&mut self, name: &str, dtype: DataType) -> Option { - let old_dtype = self.inner.get_mut(name)?; - Some(std::mem::replace(old_dtype, dtype)) - } - - /// Change the field at the given index to the given `dtype` and return the previous dtype. - /// - /// If the index is out of bounds, the schema is not modified and `None` is returned. Otherwise returns - /// `Some(old_dtype)`. - /// - /// This method only ever modifies an existing index and never adds a new field to the schema. To add a new field, - /// use [`with_column`][Self::with_column] or [`insert_at_index`][Self::insert_at_index]. - pub fn set_dtype_at_index(&mut self, index: usize, dtype: DataType) -> Option { - let (_, old_dtype) = self.inner.get_index_mut(index)?; - Some(std::mem::replace(old_dtype, dtype)) - } - - /// Insert a new column in the [`Schema`]. - /// - /// If an equivalent name already exists in the schema: the name remains and - /// retains in its place in the order, its corresponding value is updated - /// with [`DataType`] and the older dtype is returned inside `Some(_)`. - /// - /// If no equivalent key existed in the map: the new name-dtype pair is - /// inserted, last in order, and `None` is returned. - /// - /// To enforce the index of the resulting field, use [`insert_at_index`][Self::insert_at_index]. - /// - /// Computes in **O(1)** time (amortized average). - pub fn with_column(&mut self, name: PlSmallStr, dtype: DataType) -> Option { - self.inner.insert(name, dtype) - } - - /// Merge `other` into `self`. - /// - /// Merging logic: - /// - Fields that occur in `self` but not `other` are unmodified - /// - Fields that occur in `other` but not `self` are appended, in order, to the end of `self` - /// - Fields that occur in both `self` and `other` are updated with the dtype from `other`, but keep their original - /// index - pub fn merge(&mut self, other: Self) { - self.inner.extend(other.inner) - } - - /// Merge borrowed `other` into `self`. - /// - /// Merging logic: - /// - Fields that occur in `self` but not `other` are unmodified - /// - Fields that occur in `other` but not `self` are appended, in order, to the end of `self` - /// - Fields that occur in both `self` and `other` are updated with the dtype from `other`, but keep their original - /// index - pub fn merge_from_ref(&mut self, other: &Self) { - self.inner.extend( - other - .iter() - .map(|(column, datatype)| (column.clone(), datatype.clone())), - ) - } - /// Convert self to `ArrowSchema` by cloning the fields. - pub fn to_arrow(&self, compat_level: CompatLevel) -> ArrowSchema { + fn to_arrow(&self, compat_level: CompatLevel) -> ArrowSchema { let fields: Vec<_> = self - .inner .iter() .map(|(name, dtype)| dtype.to_arrow_field(name.clone(), compat_level)) .collect(); @@ -385,40 +67,17 @@ impl Schema { /// /// Note that this clones each name and dtype in order to form an owned [`Field`]. For a clone-free version, use /// [`iter`][Self::iter], which returns `(&name, &dtype)`. - pub fn iter_fields(&self) -> impl ExactSizeIterator + '_ { - self.inner - .iter() + fn iter_fields(&self) -> impl ExactSizeIterator + '_ { + self.iter() .map(|(name, dtype)| Field::new(name.clone(), dtype.clone())) } - /// Iterates over references to the dtypes in this schema. - pub fn iter_dtypes(&self) -> impl '_ + ExactSizeIterator { - self.inner.iter().map(|(_name, dtype)| dtype) - } - - /// Iterates over mut references to the dtypes in this schema. - pub fn iter_dtypes_mut(&mut self) -> impl '_ + ExactSizeIterator { - self.inner.iter_mut().map(|(_name, dtype)| dtype) - } - - /// Iterates over references to the names in this schema. - pub fn iter_names(&self) -> impl '_ + ExactSizeIterator { - self.inner.iter().map(|(name, _dtype)| name) - } - - /// Iterates over the `(&name, &dtype)` pairs in this schema. - /// - /// For an owned version, use [`iter_fields`][Self::iter_fields], which clones the data to iterate owned `Field`s - pub fn iter(&self) -> impl Iterator + '_ { - self.inner.iter() - } - /// Take another [`Schema`] and try to find the supertypes between them. - pub fn to_supertype(&mut self, other: &Schema) -> PolarsResult { + fn to_supertype(&mut self, other: &Schema) -> PolarsResult { polars_ensure!(self.len() == other.len(), ComputeError: "schema lengths differ"); let mut changed = false; - for ((k, dt), (other_k, other_dt)) in self.inner.iter_mut().zip(other.iter()) { + for ((k, dt), (other_k, other_dt)) in self.iter_mut().zip(other.iter()) { polars_ensure!(k == other_k, ComputeError: "schema names differ: got {}, expected {}", k, other_k); let st = try_get_supertype(dt, other_dt)?; @@ -427,38 +86,6 @@ impl Schema { } Ok(changed) } - - /// Generates another schema with just the specified columns selected from this one. - pub fn select(&self, columns: I) -> PolarsResult - where - I: IntoIterator, - I::Item: AsRef, - { - Ok(Self { - inner: columns - .into_iter() - .map(|c| { - let name = c.as_ref(); - let (_, name, dtype) = self - .inner - .get_full(name) - .ok_or_else(|| polars_err!(col_not_found = name))?; - PolarsResult::Ok((name.clone(), dtype.clone())) - }) - .try_collect()?, - }) - } -} - -pub type SchemaRef = Arc; - -impl IntoIterator for Schema { - type Item = (PlSmallStr, DataType); - type IntoIter = as IntoIterator>::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.inner.into_iter() - } } /// This trait exists to be unify the API of polars Schema and arrows Schema. @@ -485,7 +112,7 @@ pub trait IndexOfSchema: Debug { impl IndexOfSchema for Schema { fn index_of(&self, name: &str) -> Option { - self.inner.get_index_of(name) + self.index_of(name) } fn get_names(&self) -> Vec<&PlSmallStr> { @@ -532,8 +159,7 @@ impl SchemaNamesAndDtypes for Schema { type DataType = DataType; fn get_names_and_dtypes(&'_ self) -> Vec<(&'_ str, Self::DataType)> { - self.inner - .iter() + self.iter() .map(|(name, dtype)| (name.as_str(), dtype.clone())) .collect() } @@ -551,29 +177,6 @@ impl SchemaNamesAndDtypes for ArrowSchema { } } -impl From<&ArrowSchema> for Schema { - fn from(value: &ArrowSchema) -> Self { - Self::from_iter(value.fields.iter()) - } -} -impl From for Schema { - fn from(value: ArrowSchema) -> Self { - Self::from(&value) - } -} - -impl From for Schema { - fn from(value: ArrowSchemaRef) -> Self { - Self::from(value.as_ref()) - } -} - -impl From<&ArrowSchemaRef> for Schema { - fn from(value: &ArrowSchemaRef) -> Self { - Self::from(value.as_ref()) - } -} - pub fn ensure_matching_schema(lhs: &S, rhs: &S) -> PolarsResult<()> { let lhs = lhs.get_names_and_dtypes(); let rhs = rhs.get_names_and_dtypes(); diff --git a/crates/polars-io/src/avro/read.rs b/crates/polars-io/src/avro/read.rs index 6d410c74ef0e..3c386ef9d77a 100644 --- a/crates/polars-io/src/avro/read.rs +++ b/crates/polars-io/src/avro/read.rs @@ -39,7 +39,7 @@ impl AvroReader { /// Get schema of the Avro File pub fn schema(&mut self) -> PolarsResult { let schema = self.arrow_schema()?; - Ok(Schema::from_iter(&schema.fields)) + Ok(Schema::from_arrow_schema(&schema)) } /// Get arrow schema of the avro File, this is faster than a polars schema. diff --git a/crates/polars-io/src/csv/read/schema_inference.rs b/crates/polars-io/src/csv/read/schema_inference.rs index c8870ee65b30..50942d8b5d56 100644 --- a/crates/polars-io/src/csv/read/schema_inference.rs +++ b/crates/polars-io/src/csv/read/schema_inference.rs @@ -314,7 +314,7 @@ fn infer_file_schema_inner( decimal_comma, ); } else if !raise_if_empty { - return Ok((Schema::new(), 0, 0)); + return Ok((Schema::default(), 0, 0)); } else { polars_bail!(NoData: "empty CSV"); }; diff --git a/crates/polars-io/src/ipc/ipc_reader_async.rs b/crates/polars-io/src/ipc/ipc_reader_async.rs index 2949a334335e..089dceaf9a8a 100644 --- a/crates/polars-io/src/ipc/ipc_reader_async.rs +++ b/crates/polars-io/src/ipc/ipc_reader_async.rs @@ -5,7 +5,7 @@ use object_store::path::Path; use object_store::ObjectMeta; use polars_core::datatypes::IDX_DTYPE; use polars_core::frame::DataFrame; -use polars_core::schema::Schema; +use polars_core::schema::{Schema, SchemaExt}; use polars_error::{polars_bail, polars_err, to_compute_err, PolarsResult}; use polars_utils::pl_str::PlSmallStr; @@ -157,7 +157,10 @@ impl IpcReaderAsync { &fetched_metadata }; - let schema = prepare_schema((&metadata.schema).into(), options.row_index.as_ref()); + let schema = prepare_schema( + Schema::from_arrow_schema(metadata.schema.as_ref()), + options.row_index.as_ref(), + ); let hive_partitions = None; diff --git a/crates/polars-io/src/ipc/ipc_stream.rs b/crates/polars-io/src/ipc/ipc_stream.rs index 06fbc581ea30..545f19168f9f 100644 --- a/crates/polars-io/src/ipc/ipc_stream.rs +++ b/crates/polars-io/src/ipc/ipc_stream.rs @@ -76,7 +76,7 @@ pub struct IpcStreamReader { impl IpcStreamReader { /// Get schema of the Ipc Stream File pub fn schema(&mut self) -> PolarsResult { - Ok(Schema::from_iter(&self.metadata()?.schema.fields)) + Ok(Schema::from_arrow_schema(&self.metadata()?.schema)) } /// Get arrow schema of the Ipc Stream File, this is faster than creating a polars schema. diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index 01763edeaa59..1a8f9eb8f5a4 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -301,7 +301,7 @@ where polars_bail!(ComputeError: "can only deserialize json objects") }; - let mut schema = Schema::from_iter(fields.iter()); + let mut schema = Schema::from_iter(fields.iter().map(Into::::into)); overwrite_schema(&mut schema, overwrite)?; DataType::Struct( diff --git a/crates/polars-io/src/ndjson/mod.rs b/crates/polars-io/src/ndjson/mod.rs index 4ec6ffa7a1da..196842d4ad1a 100644 --- a/crates/polars-io/src/ndjson/mod.rs +++ b/crates/polars-io/src/ndjson/mod.rs @@ -15,6 +15,7 @@ pub fn infer_schema( crate::json::infer::data_types_to_supertype(data_types.map(|dt| DataType::from(&dt)))?; let schema = StructArray::get_fields(&data_type.to_arrow(CompatLevel::newest())) .iter() + .map(Into::::into) .collect(); Ok(schema) } diff --git a/crates/polars-io/src/parquet/read/predicates.rs b/crates/polars-io/src/parquet/read/predicates.rs index 2c4b7c403db6..b5003f1a1feb 100644 --- a/crates/polars-io/src/parquet/read/predicates.rs +++ b/crates/polars-io/src/parquet/read/predicates.rs @@ -38,7 +38,7 @@ pub(crate) fn collect_statistics( } Ok(Some(BatchStats::new( - Arc::new(schema.into()), + Arc::new(Schema::from_arrow_schema(schema)), stats, Some(part_md.num_rows()), ))) diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 8a61b74796c7..096e7d8170c8 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -496,7 +496,7 @@ fn rg_to_dfs_prefiltered( .collect::>>()?; let mut rearranged_schema = df.schema(); - rearranged_schema.merge(Schema::from(schema)); + rearranged_schema.merge(Schema::from_arrow_schema(schema.as_ref())); debug_assert!(rg_columns.iter().all(|v| v.len() == df.height())); diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index 8776c46060ba..777f769866d0 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -244,7 +244,7 @@ fn get_pipeline_node( // so we just create a scan that returns an empty df let dummy = lp_arena.add(IR::DataFrameScan { df: Arc::new(DataFrame::empty()), - schema: Arc::new(Schema::new()), + schema: Arc::new(Schema::default()), output_schema: None, filter: None, }); diff --git a/crates/polars-lazy/src/tests/schema.rs b/crates/polars-lazy/src/tests/schema.rs index ec11d16a5359..e4166d33c94a 100644 --- a/crates/polars-lazy/src/tests/schema.rs +++ b/crates/polars-lazy/src/tests/schema.rs @@ -33,7 +33,7 @@ fn test_schema_update_after_projection_pd() -> PolarsResult<()> { )); let schema = lp.schema(&lp_arena).into_owned(); - let mut expected = Schema::new(); + let mut expected = Schema::default(); expected.with_column("a".into(), DataType::Int32); expected.with_column("b".into(), DataType::Int32); assert_eq!(schema.as_ref(), &expected); diff --git a/crates/polars-ops/Cargo.toml b/crates/polars-ops/Cargo.toml index c60e91af7021..0782f188b1df 100644 --- a/crates/polars-ops/Cargo.toml +++ b/crates/polars-ops/Cargo.toml @@ -13,6 +13,7 @@ polars-compute = { workspace = true } polars-core = { workspace = true, features = ["algorithm_group_by", "zip_with"] } polars-error = { workspace = true } polars-json = { workspace = true, optional = true } +polars-schema = { workspace = true } polars-utils = { workspace = true } ahash = { workspace = true } @@ -82,7 +83,7 @@ timezones = ["chrono", "chrono-tz", "polars-core/temporal", "polars-core/timezon random = ["rand", "rand_distr"] rank = ["rand"] find_many = ["aho-corasick"] -serde = ["dep:serde", "polars-core/serde", "polars-utils/serde"] +serde = ["dep:serde", "polars-core/serde", "polars-utils/serde", "polars-schema/serde"] # extra utilities for BinaryChunked binary_encoding = ["base64", "hex"] diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 54654f3ab5b0..0200ef1403da 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -189,7 +189,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut ConversionContext) -> PolarsResult { - owned = Some(Schema::from(v)); + owned = Some(Schema::from_arrow_schema(v.as_ref())); owned.as_ref().unwrap() }, Either::Right(v) => v.as_ref(), diff --git a/crates/polars-plan/src/plans/conversion/scans.rs b/crates/polars-plan/src/plans/conversion/scans.rs index 7191f30b7743..9b2636430622 100644 --- a/crates/polars-plan/src/plans/conversion/scans.rs +++ b/crates/polars-plan/src/plans/conversion/scans.rs @@ -57,8 +57,10 @@ pub(super) fn parquet_file_info( let num_rows = reader.num_rows().await?; let metadata = reader.get_metadata().await?.clone(); - let schema = - prepare_output_schema((&reader_schema).into(), file_options.row_index.as_ref()); + let schema = prepare_output_schema( + Schema::from_arrow_schema(reader_schema.as_ref()), + file_options.row_index.as_ref(), + ); PolarsResult::Ok((schema, reader_schema, Some(num_rows), Some(metadata))) })? } @@ -66,8 +68,10 @@ pub(super) fn parquet_file_info( let file = polars_utils::open_file(path)?; let mut reader = ParquetReader::new(file); let reader_schema = reader.schema()?; - let schema = - prepare_output_schema((&reader_schema).into(), file_options.row_index.as_ref()); + let schema = prepare_output_schema( + Schema::from_arrow_schema(reader_schema.as_ref()), + file_options.row_index.as_ref(), + ); ( schema, reader_schema, @@ -115,7 +119,7 @@ pub(super) fn ipc_file_info( }; let file_info = FileInfo::new( prepare_output_schema( - metadata.schema.as_ref().into(), + Schema::from_arrow_schema(metadata.schema.as_ref()), file_options.row_index.as_ref(), ), Some(Either::Left(Arc::clone(&metadata.schema))), diff --git a/crates/polars-python/src/interop/arrow/to_py.rs b/crates/polars-python/src/interop/arrow/to_py.rs index b2bfa86bf2cd..c8ac9aee7093 100644 --- a/crates/polars-python/src/interop/arrow/to_py.rs +++ b/crates/polars-python/src/interop/arrow/to_py.rs @@ -5,7 +5,7 @@ use arrow::ffi; use arrow::record_batch::RecordBatch; use polars::datatypes::CompatLevel; use polars::frame::DataFrame; -use polars::prelude::{ArrayRef, ArrowField, PlSmallStr}; +use polars::prelude::{ArrayRef, ArrowField, PlSmallStr, SchemaExt}; use polars::series::Series; use polars_core::utils::arrow; use polars_error::PolarsResult; diff --git a/crates/polars-schema/Cargo.toml b/crates/polars-schema/Cargo.toml new file mode 100644 index 000000000000..f928d9f36925 --- /dev/null +++ b/crates/polars-schema/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "polars-schema" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +description = "Private crate for schema utilities for the Polars DataFrame library" + +[dependencies] +indexmap = { workspace = true } +polars-error = { workspace = true } +polars-utils = { workspace = true } +serde = { workspace = true, optional = true } + +[build-dependencies] +version_check = { workspace = true } + +[features] +nightly = [] +serde = ["dep:serde", "serde/derive"] diff --git a/crates/polars-schema/LICENSE b/crates/polars-schema/LICENSE new file mode 120000 index 000000000000..30cff7403da0 --- /dev/null +++ b/crates/polars-schema/LICENSE @@ -0,0 +1 @@ +../../LICENSE \ No newline at end of file diff --git a/crates/polars-schema/README.md b/crates/polars-schema/README.md new file mode 100644 index 000000000000..6d68ee41675a --- /dev/null +++ b/crates/polars-schema/README.md @@ -0,0 +1,5 @@ +# polars-schema + +`polars-schema` is an **internal sub-crate** of the [Polars](https://crates.io/crates/polars) library, supplying private schema utility functions. + +**Important Note**: This crate is **not intended for external usage**. Please refer to the main [Polars crate](https://crates.io/crates/polars) for intended usage. diff --git a/crates/polars-schema/build.rs b/crates/polars-schema/build.rs new file mode 100644 index 000000000000..3e4ab64620ac --- /dev/null +++ b/crates/polars-schema/build.rs @@ -0,0 +1,7 @@ +fn main() { + println!("cargo:rerun-if-changed=build.rs"); + let channel = version_check::Channel::read().unwrap(); + if channel.is_nightly() { + println!("cargo:rustc-cfg=feature=\"nightly\""); + } +} diff --git a/crates/polars-schema/src/lib.rs b/crates/polars-schema/src/lib.rs new file mode 100644 index 000000000000..1ce7e1766648 --- /dev/null +++ b/crates/polars-schema/src/lib.rs @@ -0,0 +1 @@ +pub mod schema; diff --git a/crates/polars-schema/src/schema.rs b/crates/polars-schema/src/schema.rs new file mode 100644 index 000000000000..04be14455bea --- /dev/null +++ b/crates/polars-schema/src/schema.rs @@ -0,0 +1,393 @@ +use core::fmt::{Debug, Formatter}; +use core::hash::{Hash, Hasher}; + +use indexmap::map::MutableKeys; +use polars_error::{polars_ensure, polars_err, PolarsResult}; +use polars_utils::aliases::{InitHashMaps, PlIndexMap}; +use polars_utils::pl_str::PlSmallStr; + +#[derive(Clone, Default)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct Schema { + fields: PlIndexMap, +} + +impl Eq for Schema {} + +impl Schema +where + D: Clone + Default, +{ + pub fn with_capacity(capacity: usize) -> Self { + let fields = PlIndexMap::with_capacity(capacity); + Self { fields } + } + + /// Reserve `additional` memory spaces in the schema. + pub fn reserve(&mut self, additional: usize) { + self.fields.reserve(additional); + } + + /// The number of fields in the schema. + #[inline] + pub fn len(&self) -> usize { + self.fields.len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.fields.is_empty() + } + + /// Rename field `old` to `new`, and return the (owned) old name. + /// + /// If `old` is not present in the schema, the schema is not modified and `None` is returned. Otherwise the schema + /// is updated and `Some(old_name)` is returned. + pub fn rename(&mut self, old: &str, new: PlSmallStr) -> Option { + // Remove `old`, get the corresponding index and dtype, and move the last item in the map to that position + let (old_index, old_name, dtype) = self.fields.swap_remove_full(old)?; + // Insert the same dtype under the new name at the end of the map and store that index + let (new_index, _) = self.fields.insert_full(new, dtype); + // Swap the two indices to move the originally last element back to the end and to move the new element back to + // its original position + self.fields.swap_indices(old_index, new_index); + + Some(old_name) + } + + /// Create a new schema from this one, inserting a field with `name` and `dtype` at the given `index`. + /// + /// If a field named `name` already exists, it is updated with the new dtype. Regardless, the field named `name` is + /// always moved to the given index. Valid indices range from `0` (front of the schema) to `self.len()` (after the + /// end of the schema). + /// + /// For a mutating version that doesn't clone, see [`insert_at_index`][Self::insert_at_index]. + /// + /// Runtime: **O(m * n)** where `m` is the (average) length of the field names and `n` is the number of fields in + /// the schema. This method clones every field in the schema. + /// + /// Returns: `Ok(new_schema)` if `index <= self.len()`, else `Err(PolarsError)` + pub fn new_inserting_at_index( + &self, + index: usize, + name: PlSmallStr, + field: D, + ) -> PolarsResult { + polars_ensure!( + index <= self.len(), + OutOfBounds: + "index {} is out of bounds for schema with length {} (the max index allowed is self.len())", + index, + self.len() + ); + + let mut new = Self::default(); + let mut iter = self.fields.iter().filter_map(|(fld_name, dtype)| { + (fld_name != &name).then_some((fld_name.clone(), dtype.clone())) + }); + new.fields.extend(iter.by_ref().take(index)); + new.fields.insert(name.clone(), field); + new.fields.extend(iter); + Ok(new) + } + + /// Insert a field with `name` and `dtype` at the given `index` into this schema. + /// + /// If a field named `name` already exists, it is updated with the new dtype. Regardless, the field named `name` is + /// always moved to the given index. Valid indices range from `0` (front of the schema) to `self.len()` (after the + /// end of the schema). + /// + /// For a non-mutating version that clones the schema, see [`new_inserting_at_index`][Self::new_inserting_at_index]. + /// + /// Runtime: **O(n)** where `n` is the number of fields in the schema. + /// + /// Returns: + /// - If index is out of bounds, `Err(PolarsError)` + /// - Else if `name` was already in the schema, `Ok(Some(old_dtype))` + /// - Else `Ok(None)` + pub fn insert_at_index( + &mut self, + mut index: usize, + name: PlSmallStr, + dtype: D, + ) -> PolarsResult> { + polars_ensure!( + index <= self.len(), + OutOfBounds: + "index {} is out of bounds for schema with length {} (the max index allowed is self.len())", + index, + self.len() + ); + + let (old_index, old_dtype) = self.fields.insert_full(name, dtype); + + // If we're moving an existing field, one-past-the-end will actually be out of bounds. Also, self.len() won't + // have changed after inserting, so `index == self.len()` is the same as it was before inserting. + if old_dtype.is_some() && index == self.len() { + index -= 1; + } + self.fields.move_index(old_index, index); + Ok(old_dtype) + } + + /// Get a reference to the dtype of the field named `name`, or `None` if the field doesn't exist. + pub fn get(&self, name: &str) -> Option<&D> { + self.fields.get(name) + } + + /// Get a reference to the dtype of the field named `name`, or `Err(PolarsErr)` if the field doesn't exist. + pub fn try_get(&self, name: &str) -> PolarsResult<&D> { + self.get(name) + .ok_or_else(|| polars_err!(SchemaFieldNotFound: "{}", name)) + } + + /// Get a mutable reference to the dtype of the field named `name`, or `Err(PolarsErr)` if the field doesn't exist. + pub fn try_get_mut(&mut self, name: &str) -> PolarsResult<&mut D> { + self.fields + .get_mut(name) + .ok_or_else(|| polars_err!(SchemaFieldNotFound: "{}", name)) + } + + /// Return all data about the field named `name`: its index in the schema, its name, and its dtype. + /// + /// Returns `Some((index, &name, &dtype))` if the field exists, `None` if it doesn't. + pub fn get_full(&self, name: &str) -> Option<(usize, &PlSmallStr, &D)> { + self.fields.get_full(name) + } + + /// Return all data about the field named `name`: its index in the schema, its name, and its dtype. + /// + /// Returns `Ok((index, &name, &dtype))` if the field exists, `Err(PolarsErr)` if it doesn't. + pub fn try_get_full(&self, name: &str) -> PolarsResult<(usize, &PlSmallStr, &D)> { + self.fields + .get_full(name) + .ok_or_else(|| polars_err!(SchemaFieldNotFound: "{}", name)) + } + + /// Get references to the name and dtype of the field at `index`. + /// + /// If `index` is inbounds, returns `Some((&name, &dtype))`, else `None`. See + /// [`get_at_index_mut`][Self::get_at_index_mut] for a mutable version. + pub fn get_at_index(&self, index: usize) -> Option<(&PlSmallStr, &D)> { + self.fields.get_index(index) + } + + pub fn try_get_at_index(&self, index: usize) -> PolarsResult<(&PlSmallStr, &D)> { + self.fields.get_index(index).ok_or_else(|| polars_err!(ComputeError: "index {index} out of bounds with 'schema' of len: {}", self.len())) + } + + /// Get mutable references to the name and dtype of the field at `index`. + /// + /// If `index` is inbounds, returns `Some((&mut name, &mut dtype))`, else `None`. See + /// [`get_at_index`][Self::get_at_index] for an immutable version. + pub fn get_at_index_mut(&mut self, index: usize) -> Option<(&mut PlSmallStr, &mut D)> { + self.fields.get_index_mut2(index) + } + + /// Swap-remove a field by name and, if the field existed, return its dtype. + /// + /// If the field does not exist, the schema is not modified and `None` is returned. + /// + /// This method does a `swap_remove`, which is O(1) but **changes the order of the schema**: the field named `name` + /// is replaced by the last field, which takes its position. For a slower, but order-preserving, method, use + /// [`shift_remove`][Self::shift_remove]. + pub fn remove(&mut self, name: &str) -> Option { + self.fields.swap_remove(name) + } + + /// Remove a field by name, preserving order, and, if the field existed, return its dtype. + /// + /// If the field does not exist, the schema is not modified and `None` is returned. + /// + /// This method does a `shift_remove`, which preserves the order of the fields in the schema but **is O(n)**. For a + /// faster, but not order-preserving, method, use [`remove`][Self::remove]. + pub fn shift_remove(&mut self, name: &str) -> Option { + self.fields.shift_remove(name) + } + + /// Remove a field by name, preserving order, and, if the field existed, return its dtype. + /// + /// If the field does not exist, the schema is not modified and `None` is returned. + /// + /// This method does a `shift_remove`, which preserves the order of the fields in the schema but **is O(n)**. For a + /// faster, but not order-preserving, method, use [`remove`][Self::remove]. + pub fn shift_remove_index(&mut self, index: usize) -> Option<(PlSmallStr, D)> { + self.fields.shift_remove_index(index) + } + + /// Whether the schema contains a field named `name`. + pub fn contains(&self, name: &str) -> bool { + self.get(name).is_some() + } + + /// Change the field named `name` to the given `dtype` and return the previous dtype. + /// + /// If `name` doesn't already exist in the schema, the schema is not modified and `None` is returned. Otherwise + /// returns `Some(old_dtype)`. + /// + /// This method only ever modifies an existing field and never adds a new field to the schema. To add a new field, + /// use [`with_column`][Self::with_column] or [`insert_at_index`][Self::insert_at_index]. + pub fn set_dtype(&mut self, name: &str, dtype: D) -> Option { + let old_dtype = self.fields.get_mut(name)?; + Some(std::mem::replace(old_dtype, dtype)) + } + + /// Change the field at the given index to the given `dtype` and return the previous dtype. + /// + /// If the index is out of bounds, the schema is not modified and `None` is returned. Otherwise returns + /// `Some(old_dtype)`. + /// + /// This method only ever modifies an existing index and never adds a new field to the schema. To add a new field, + /// use [`with_column`][Self::with_column] or [`insert_at_index`][Self::insert_at_index]. + pub fn set_dtype_at_index(&mut self, index: usize, dtype: D) -> Option { + let (_, old_dtype) = self.fields.get_index_mut(index)?; + Some(std::mem::replace(old_dtype, dtype)) + } + + /// Insert a new column in the [`Schema`]. + /// + /// If an equivalent name already exists in the schema: the name remains and + /// retains in its place in the order, its corresponding value is updated + /// with [`D`] and the older dtype is returned inside `Some(_)`. + /// + /// If no equivalent key existed in the map: the new name-dtype pair is + /// inserted, last in order, and `None` is returned. + /// + /// To enforce the index of the resulting field, use [`insert_at_index`][Self::insert_at_index]. + /// + /// Computes in **O(1)** time (amortized average). + pub fn with_column(&mut self, name: PlSmallStr, dtype: D) -> Option { + self.fields.insert(name, dtype) + } + + /// Merge `other` into `self`. + /// + /// Merging logic: + /// - Fields that occur in `self` but not `other` are unmodified + /// - Fields that occur in `other` but not `self` are appended, in order, to the end of `self` + /// - Fields that occur in both `self` and `other` are updated with the dtype from `other`, but keep their original + /// index + pub fn merge(&mut self, other: Self) { + self.fields.extend(other.fields) + } + + /// Merge borrowed `other` into `self`. + /// + /// Merging logic: + /// - Fields that occur in `self` but not `other` are unmodified + /// - Fields that occur in `other` but not `self` are appended, in order, to the end of `self` + /// - Fields that occur in both `self` and `other` are updated with the dtype from `other`, but keep their original + /// index + pub fn merge_from_ref(&mut self, other: &Self) { + self.fields.extend( + other + .iter() + .map(|(column, field)| (column.clone(), field.clone())), + ) + } + + /// Iterates over references to the dtypes in this schema. + pub fn iter_dtypes(&self) -> impl '_ + ExactSizeIterator { + self.fields.iter().map(|(_name, dtype)| dtype) + } + + /// Iterates over mut references to the dtypes in this schema. + pub fn iter_dtypes_mut(&mut self) -> impl '_ + ExactSizeIterator { + self.fields.iter_mut().map(|(_name, dtype)| dtype) + } + + /// Iterates over references to the names in this schema. + pub fn iter_names(&self) -> impl '_ + ExactSizeIterator { + self.fields.iter().map(|(name, _dtype)| name) + } + + /// Iterates over the `(&name, &dtype)` pairs in this schema. + /// + /// For an owned version, use [`iter_fields`][Self::iter_fields], which clones the data to iterate owned `Field`s + pub fn iter(&self) -> impl ExactSizeIterator + '_ { + self.fields.iter() + } + + pub fn iter_mut(&mut self) -> impl ExactSizeIterator + '_ { + self.fields.iter_mut() + } + + /// Generates another schema with just the specified columns selected from this one. + pub fn select(&self, columns: I) -> PolarsResult + where + I: IntoIterator, + I::Item: AsRef, + { + let schema = columns + .into_iter() + .map(|c| { + let name = c.as_ref(); + let (_, name, dtype) = self + .fields + .get_full(name) + .ok_or_else(|| polars_err!(col_not_found = name))?; + PolarsResult::Ok((name.clone(), dtype.clone())) + }) + .collect::>>()?; + Ok(Self::from(schema)) + } + + pub fn index_of(&self, name: &str) -> Option { + self.fields.get_index_of(name) + } +} + +impl Debug for Schema { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + writeln!(f, "Schema:")?; + for (name, field) in self.fields.iter() { + writeln!(f, "name: {name}, field: {field:?}")?; + } + Ok(()) + } +} + +impl Hash for Schema { + fn hash(&self, state: &mut H) { + self.fields.iter().for_each(|v| v.hash(state)) + } +} + +// Schemas will only compare equal if they have the same fields in the same order. We can't use `self.inner == +// other.inner` because [`IndexMap`] ignores order when checking equality, but we don't want to ignore it. +impl PartialEq for Schema { + fn eq(&self, other: &Self) -> bool { + self.fields.len() == other.fields.len() + && self + .fields + .iter() + .zip(other.fields.iter()) + .all(|(a, b)| a == b) + } +} + +impl From> for Schema { + fn from(fields: PlIndexMap) -> Self { + Self { fields } + } +} + +impl FromIterator for Schema +where + F: Into<(PlSmallStr, D)>, + D: Clone, +{ + fn from_iter>(iter: I) -> Self { + let fields = PlIndexMap::from_iter(iter.into_iter().map(|x| x.into())); + Self { fields } + } +} + +impl IntoIterator for Schema { + type IntoIter = as IntoIterator>::IntoIter; + type Item = (PlSmallStr, D); + + fn into_iter(self) -> Self::IntoIter { + self.fields.into_iter() + } +} diff --git a/crates/polars-sql/src/sql_expr.rs b/crates/polars-sql/src/sql_expr.rs index e957e72fc95c..f0b48566a474 100644 --- a/crates/polars-sql/src/sql_expr.rs +++ b/crates/polars-sql/src/sql_expr.rs @@ -1358,7 +1358,7 @@ pub(crate) fn resolve_compound_identifier( Ok(Arc::new(if let Some(active_schema) = active_schema { active_schema.clone() } else { - Schema::new() + Schema::default() })) }?; diff --git a/crates/polars-stream/src/nodes/parquet_source.rs b/crates/polars-stream/src/nodes/parquet_source.rs index f1ff1973920d..55e6e97e578d 100644 --- a/crates/polars-stream/src/nodes/parquet_source.rs +++ b/crates/polars-stream/src/nodes/parquet_source.rs @@ -10,7 +10,6 @@ use polars_core::frame::DataFrame; use polars_core::prelude::{ ArrowSchema, ChunkFull, DataType, IdxCa, InitHashMaps, PlHashMap, StringChunked, }; -use polars_core::schema::IndexOfSchema; use polars_core::series::{IntoSeries, IsSorted, Series}; use polars_core::utils::operation_exceeded_idxsize_msg; use polars_error::{polars_bail, polars_err, PolarsResult}; diff --git a/crates/polars-stream/src/nodes/reduce.rs b/crates/polars-stream/src/nodes/reduce.rs index 26efe0b41922..2ce9ee2c9464 100644 --- a/crates/polars-stream/src/nodes/reduce.rs +++ b/crates/polars-stream/src/nodes/reduce.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use polars_core::schema::Schema; +use polars_core::schema::{Schema, SchemaExt}; use polars_expr::reduce::{Reduction, ReductionState}; use polars_utils::itertools::Itertools; diff --git a/crates/polars-stream/src/physical_plan/lower_expr.rs b/crates/polars-stream/src/physical_plan/lower_expr.rs index b91dbe1419ff..3057583b0992 100644 --- a/crates/polars-stream/src/physical_plan/lower_expr.rs +++ b/crates/polars-stream/src/physical_plan/lower_expr.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use polars_core::frame::DataFrame; use polars_core::prelude::{Field, InitHashMaps, PlHashMap, PlHashSet}; -use polars_core::schema::Schema; +use polars_core::schema::{Schema, SchemaExt}; use polars_error::PolarsResult; use polars_expr::planner::get_expr_depth_limit; use polars_expr::state::ExecutionState; diff --git a/crates/polars/tests/it/io/json.rs b/crates/polars/tests/it/io/json.rs index de8253a9656f..9095d4299bdd 100644 --- a/crates/polars/tests/it/io/json.rs +++ b/crates/polars/tests/it/io/json.rs @@ -151,7 +151,7 @@ fn test_read_ndjson_iss_5875() { DataType::List(Box::new(DataType::String)), ); - let mut schema = Schema::new(); + let mut schema = Schema::default(); schema.with_column( "struct".into(), DataType::Struct(vec![field_int_inner, field_float_inner, field_str_inner]), diff --git a/docs/src/rust/user-guide/expressions/aggregation.rs b/docs/src/rust/user-guide/expressions/aggregation.rs index 90f39c9d04ad..9436565330bf 100644 --- a/docs/src/rust/user-guide/expressions/aggregation.rs +++ b/docs/src/rust/user-guide/expressions/aggregation.rs @@ -8,7 +8,7 @@ fn main() -> Result<(), Box> { let url = "https://theunitedstates.io/congress-legislators/legislators-historical.csv"; - let mut schema = Schema::new(); + let mut schema = Schema::default(); schema.with_column( "first_name".into(), DataType::Categorical(None, Default::default()),