Skip to content

Commit

Permalink
clean up code
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka committed Oct 3, 2024
1 parent 45f885b commit 78fa527
Show file tree
Hide file tree
Showing 13 changed files with 95 additions and 56 deletions.
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions src/arrow2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,10 @@ serde_json = {version = "^1.0", features = [
], optional = true}
# For SIMD utf8 validation
simdutf8 = "0.1.4"
similar = "2.6.0"
streaming-iterator = {version = "0.1", optional = true}
# for division/remainder optimization at runtime
strength_reduce = {version = "0.2", optional = true}
thiserror = {workspace = true}
tracing = "0.1.40"
zstd = {version = "0.12", optional = true}

# parquet support
Expand Down
3 changes: 1 addition & 2 deletions src/arrow2/src/array/map/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ impl Array for MapArray {
impl_common_array!();

fn convert_logical_type(&self, target: DataType) -> Box<dyn Array> {
tracing::trace!("converting logical type to\n{target:#?}");
let outer_is_map = matches!(target, DataType::Map { .. });

if outer_is_map {
Expand Down Expand Up @@ -232,7 +231,7 @@ impl Array for MapArray {
field.change_type(target_inner.data_type.clone());

let offsets = self.offsets().clone();
let offsets = offsets.map(|offset| offset as i64);
let offsets = unsafe { offsets.map_unchecked(|offset| offset as i64) };

let list = ListArray::new(target, offsets, field, self.validity.clone());

Expand Down
22 changes: 12 additions & 10 deletions src/arrow2/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static {
/// When the validity is [`None`], all slots are valid.
fn validity(&self) -> Option<&Bitmap>;

/// Returns an iterator over the direct children of this Array.
///
/// This method is useful for accessing child Arrays in composite types such as struct arrays.
/// By default, it returns an empty iterator, as most array types do not have child arrays.
///
/// # Returns
/// A boxed iterator yielding mutable references to child Arrays.
///
/// # Examples
/// For a StructArray, this would return an iterator over its field arrays.
/// For most other array types, this returns an empty iterator.
fn direct_children<'a>(&'a mut self) -> Box<dyn Iterator<Item = &'a mut dyn Array> + 'a> {
Box::new(core::iter::empty())
}
Expand Down Expand Up @@ -636,16 +647,7 @@ macro_rules! impl_common_array {

fn change_type(&mut self, data_type: DataType) {
if data_type.to_physical_type() != self.data_type().to_physical_type() {
let from = format!("{:#?}", self.data_type());
let to = format!("{:#?}", data_type);


let diff = similar::TextDiff::from_lines(&from, &to);

let diff = diff
.unified_diff();

panic!("{diff}");
panic!("Cannot change array type from {:?} to {:?}", self.data_type(), data_type);
}

self.data_type = data_type.clone();
Expand Down
2 changes: 0 additions & 2 deletions src/arrow2/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ pub struct Field {
impl Field {
/// Creates a new [`Field`].
pub fn new<T: Into<String>>(name: T, data_type: DataType, is_nullable: bool) -> Self {
let span = tracing::trace_span!("ArrowField::new", data_type = ?data_type, is_nullable);
let _guard = span.enter();
Field {
name: name.into(),
data_type,
Expand Down
39 changes: 37 additions & 2 deletions src/arrow2/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub type Metadata = BTreeMap<String, String>;
/// typedef for [Option<(String, Option<String>)>] descr
pub(crate) type Extension = Option<(String, Option<String>)>;

#[allow(unused_imports, reason = "used in documentation")]
use crate::array::Array;

pub type ArrowDataType = DataType;
pub type ArrowField = Field;

Expand Down Expand Up @@ -163,16 +166,48 @@ pub enum DataType {
Extension(String, Box<DataType>, Option<String>),
}


impl DataType {
pub fn map(field: impl Into<Box<Field>>, keys_sorted: bool) -> Self {
Self::Map(field.into(), keys_sorted)
}

pub fn direct_children(&self, mut processor: impl FnMut(&DataType)) {
/// Processes the direct children data types of this DataType.
///
/// This method is useful for traversing the structure of complex data types.
/// It calls the provided closure for each immediate child data type.
///
/// This can be used in conjunction with the [`Array::direct_children`] method
/// to process both the data types and the corresponding array data.
///
/// # Arguments
///
/// * `processor` - A closure that takes a reference to a DataType as its argument.
///
/// # Examples
///
/// ```
/// use arrow2::datatypes::{DataType, Field};
///
/// let struct_type = DataType::Struct(vec![
/// Field::new("a", DataType::Int32, true),
/// Field::new("b", DataType::Utf8, false),
/// ]);
///
/// let mut child_types = Vec::new();
/// struct_type.direct_children(|child_type| {
/// child_types.push(child_type);
/// });
///
/// assert_eq!(child_types, vec![&DataType::Int32, &DataType::Utf8]);
/// println!("Child data types: {:?}", child_types);
/// ```
pub fn direct_children<'a>(&'a self, mut processor: impl FnMut(&'a DataType)) {
match self {
DataType::List(field) | DataType::FixedSizeList(field, _) | DataType::LargeList(field) | DataType::Map(field, ..) => processor(&field.data_type),
DataType::Struct(fields) | DataType::Union(fields, _, _) => fields.iter().for_each(|field| processor(&field.data_type)),
_ => {} // todo: might want to add more types here
DataType::Dictionary(_, value_type, _) => processor(value_type),
_ => {} // Other types don't have child data types
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/arrow2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl Error {
Self::OutOfSpec(msg.into())
}

#[allow(unused)]
pub(crate) fn nyi<A: Into<String>>(msg: A) -> Self {
Self::NotYetImplemented(msg.into())
}
Expand Down
61 changes: 43 additions & 18 deletions src/arrow2/src/offset.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//! Contains the declaration of [`Offset`]
use std::hint::unreachable_unchecked;

use crate::buffer::Buffer;
use crate::error::Error;
pub use crate::types::Offset;
use crate::{buffer::Buffer, error::Error};

/// A wrapper type of [`Vec<O>`] representing the invariants of Arrow's offsets.
/// It is guaranteed to (sound to assume that):
Expand Down Expand Up @@ -71,7 +70,7 @@ impl<O: Offset> Offsets<O> {

/// Creates a new [`Offsets`] from an iterator of lengths
#[inline]
pub fn try_from_iter<I: IntoIterator<Item=usize>>(iter: I) -> Result<Self, Error> {
pub fn try_from_iter<I: IntoIterator<Item = usize>>(iter: I) -> Result<Self, Error> {
let iterator = iter.into_iter();
let (lower, _) = iterator.size_hint();
let mut offsets = Self::with_capacity(lower);
Expand Down Expand Up @@ -144,7 +143,9 @@ impl<O: Offset> Offsets<O> {
/// Returns the last offset of this container.
#[inline]
pub fn last(&self) -> &O {
self.0.last().unwrap_or_else(|| unsafe { unreachable_unchecked() })
self.0
.last()
.unwrap_or_else(|| unsafe { unreachable_unchecked() })
}

/// Returns a range (start, end) corresponding to the position `index`
Expand Down Expand Up @@ -212,7 +213,7 @@ impl<O: Offset> Offsets<O> {
/// # Errors
/// This function errors iff this operation overflows for the maximum value of `O`.
#[inline]
pub fn try_from_lengths<I: Iterator<Item=usize>>(lengths: I) -> Result<Self, Error> {
pub fn try_from_lengths<I: Iterator<Item = usize>>(lengths: I) -> Result<Self, Error> {
let mut self_ = Self::with_capacity(lengths.size_hint().0);
self_.try_extend_from_lengths(lengths)?;
Ok(self_)
Expand All @@ -222,7 +223,7 @@ impl<O: Offset> Offsets<O> {
/// # Errors
/// This function errors iff this operation overflows for the maximum value of `O`.
#[inline]
pub fn try_extend_from_lengths<I: Iterator<Item=usize>>(
pub fn try_extend_from_lengths<I: Iterator<Item = usize>>(
&mut self,
lengths: I,
) -> Result<(), Error> {
Expand Down Expand Up @@ -344,12 +345,34 @@ impl<O: Offset> Default for OffsetsBuffer<O> {
}
}

impl <O: Copy> OffsetsBuffer<O> {
pub fn map<T>(&self, f: impl Fn(O) -> T) -> OffsetsBuffer<T> {
let buffer = self.0.iter()
.copied()
.map(f)
.collect();
impl<O: Copy> OffsetsBuffer<O> {

/// Maps each offset to a new value, creating a new [`Self`].
///
/// # Safety
///
/// This function is marked as `unsafe` because it does not check whether the resulting offsets
/// maintain the invariants required by [`OffsetsBuffer`]. The caller must ensure that:
///
/// - The resulting offsets are monotonically increasing.
/// - The first offset is zero.
/// - All offsets are non-negative.
///
/// Violating these invariants can lead to undefined behavior when using the resulting [`OffsetsBuffer`].
///
/// # Example
///
/// ```
/// # use arrow2::offset::OffsetsBuffer;
/// # let offsets = unsafe { OffsetsBuffer::new_unchecked(vec![0, 2, 5, 7].into()) };
/// let doubled = unsafe { offsets.map_unchecked(|x| x * 2) };
/// assert_eq!(doubled.buffer().as_slice(), &[0, 4, 10, 14]);
/// ```
///
/// Note that in this example, doubling the offsets maintains the required invariants,
/// but this may not be true for all transformations.
pub unsafe fn map_unchecked<T>(&self, f: impl Fn(O) -> T) -> OffsetsBuffer<T> {
let buffer = self.0.iter().copied().map(f).collect();

OffsetsBuffer(buffer)
}
Expand All @@ -363,7 +386,6 @@ impl<O: Offset> OffsetsBuffer<O> {
Self(offsets)
}


/// Returns an empty [`OffsetsBuffer`] (i.e. with a single element, the zero)
#[inline]
pub fn new() -> Self {
Expand Down Expand Up @@ -410,7 +432,7 @@ impl<O: Offset> OffsetsBuffer<O> {
*self.last() - *self.first()
}

pub fn ranges(&self) -> impl Iterator<Item=core::ops::Range<O>> + '_ {
pub fn ranges(&self) -> impl Iterator<Item = core::ops::Range<O>> + '_ {
self.0.windows(2).map(|w| {
let from = w[0];
let to = w[1];
Expand All @@ -419,17 +441,20 @@ impl<O: Offset> OffsetsBuffer<O> {
})
}


/// Returns the first offset.
#[inline]
pub fn first(&self) -> &O {
self.0.first().unwrap_or_else(|| unsafe { unreachable_unchecked() })
self.0
.first()
.unwrap_or_else(|| unsafe { unreachable_unchecked() })
}

/// Returns the last offset.
#[inline]
pub fn last(&self) -> &O {
self.0.last().unwrap_or_else(|| unsafe { unreachable_unchecked() })
self.0
.last()
.unwrap_or_else(|| unsafe { unreachable_unchecked() })
}

/// Returns a range (start, end) corresponding to the position `index`
Expand Down Expand Up @@ -473,7 +498,7 @@ impl<O: Offset> OffsetsBuffer<O> {

/// Returns an iterator with the lengths of the offsets
#[inline]
pub fn lengths(&self) -> impl Iterator<Item=usize> + '_ {
pub fn lengths(&self) -> impl Iterator<Item = usize> + '_ {
self.0.windows(2).map(|w| (w[1] - w[0]).to_usize())
}

Expand Down
2 changes: 1 addition & 1 deletion src/daft-core/src/series/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ mod tests {
}

#[test]
fn test_() -> DaftResult<()> {
fn test_map_array_conversion() -> DaftResult<()> {
use arrow2::array::MapArray;

use super::*;
Expand Down
1 change: 0 additions & 1 deletion src/daft-dsl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ itertools = {workspace = true}
log = {workspace = true}
pyo3 = {workspace = true, optional = true}
serde = {workspace = true}
tracing = "0.1.40"
typetag = "0.2.16"

[features]
Expand Down
2 changes: 0 additions & 2 deletions src/daft-dsl/src/functions/map/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ impl FunctionEvaluator for GetEvaluator {

let field = Field::new("value", *value);

tracing::debug!("Field: {:?}", field);

Ok(field)
}

Expand Down
1 change: 0 additions & 1 deletion src/daft-table/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ num-traits = {workspace = true}
pyo3 = {workspace = true, optional = true}
rand = {workspace = true}
serde = {workspace = true}
tracing = "0.1.40"

[features]
python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "common-arrow-ffi/python", "common-display/python", "daft-image/python"]
Expand Down
5 changes: 0 additions & 5 deletions src/daft-table/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,6 @@ impl Table {
fn eval_expression(&self, expr: &Expr) -> DaftResult<Series> {
use crate::Expr::*;

let span = tracing::trace_span!("DataFrame::eval_expression", expr = ?expr);
let _guard = span.enter();

let expected_field = expr.to_field(self.schema.as_ref())?;
let series = match expr {
Alias(child, name) => Ok(self.eval_expression(child)?.rename(name)),
Expand Down Expand Up @@ -578,8 +575,6 @@ impl Table {
},
}?;

tracing::trace!("Series of {expr:?} -> {series:#?}");

if expected_field.name != series.field().name {
return Err(DaftError::ComputeError(format!(
"Mismatch of expected expression name and name from computed series ({} vs {}) for expression: {expr}",
Expand Down

0 comments on commit 78fa527

Please sign in to comment.