diff --git a/Cargo.lock b/Cargo.lock index 73612ec..9cfdd51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1620,8 +1620,6 @@ dependencies = [ [[package]] name = "pyo3-arrow" version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6d66e6404efa94448d1648868170d9e50a9e6b4e9c3ec336605587af70ad28c" dependencies = [ "arrow", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index 3c44492..29ce133 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,8 +35,8 @@ numpy = "0.23" object_store = "0.11" parquet = "53" pyo3 = { version = "0.23", features = ["macros", "indexmap"] } -pyo3-arrow = "0.6" -# pyo3-arrow = { path = "./pyo3-arrow" } +# pyo3-arrow = "0.6" +pyo3-arrow = { path = "./pyo3-arrow" } pyo3-async-runtimes = { version = "0.23", features = ["tokio-runtime"] } pyo3-file = "0.10" pyo3-object_store = { git = "https://github.com/developmentseed/object-store-rs", rev = "bad34862a92849dd7b69c28cd4c225446d3d15ab" } diff --git a/arro3-compute/src/aggregate.rs b/arro3-compute/src/aggregate.rs index 984bcbb..75b69dc 100644 --- a/arro3-compute/src/aggregate.rs +++ b/arro3-compute/src/aggregate.rs @@ -13,16 +13,17 @@ use arrow_schema::{ArrowError, DataType}; use arrow_select::concat; use pyo3::prelude::*; use pyo3_arrow::error::PyArrowResult; +use pyo3_arrow::export::Arro3Scalar; use pyo3_arrow::input::AnyArray; use pyo3_arrow::PyScalar; #[pyfunction] -pub fn max(py: Python, input: AnyArray) -> PyArrowResult { +pub fn max(input: AnyArray) -> PyArrowResult { match input { AnyArray::Array(array) => { let (array, field) = array.into_inner(); let result = max_array(array)?; - Ok(PyScalar::try_new(result, field)?.to_arro3(py)?) + Ok(PyScalar::try_new(result, field)?.into()) } AnyArray::Stream(stream) => { let reader = stream.into_reader()?; @@ -43,7 +44,7 @@ pub fn max(py: Python, input: AnyArray) -> PyArrowResult { // Call max_array on intermediate outputs let result = max_array(concatted)?; - Ok(PyScalar::try_new(result, field)?.to_arro3(py)?) + Ok(PyScalar::try_new(result, field)?.into()) } } } @@ -112,12 +113,12 @@ fn max_boolean(array: &BooleanArray) -> ArrayRef { } #[pyfunction] -pub fn min(py: Python, input: AnyArray) -> PyArrowResult { +pub fn min(input: AnyArray) -> PyArrowResult { match input { AnyArray::Array(array) => { let (array, field) = array.into_inner(); let result = min_array(array)?; - Ok(PyScalar::try_new(result, field)?.to_arro3(py)?) + Ok(PyScalar::try_new(result, field)?.into()) } AnyArray::Stream(stream) => { let reader = stream.into_reader()?; @@ -138,7 +139,7 @@ pub fn min(py: Python, input: AnyArray) -> PyArrowResult { // Call min_array on intermediate outputs let result = min_array(concatted)?; - Ok(PyScalar::try_new(result, field)?.to_arro3(py)?) + Ok(PyScalar::try_new(result, field)?.into()) } } } @@ -207,12 +208,12 @@ fn min_boolean(array: &BooleanArray) -> ArrayRef { } #[pyfunction] -pub fn sum(py: Python, input: AnyArray) -> PyArrowResult { +pub fn sum(input: AnyArray) -> PyArrowResult { match input { AnyArray::Array(array) => { let (array, field) = array.into_inner(); let result = sum_array(array)?; - Ok(PyScalar::try_new(result, field)?.to_arro3(py)?) + Ok(PyScalar::try_new(result, field)?.into()) } AnyArray::Stream(stream) => { let reader = stream.into_reader()?; @@ -233,7 +234,7 @@ pub fn sum(py: Python, input: AnyArray) -> PyArrowResult { // Call sum_array on intermediate outputs let result = sum_array(concatted)?; - Ok(PyScalar::try_new(result, field)?.to_arro3(py)?) + Ok(PyScalar::try_new(result, field)?.into()) } } } diff --git a/arro3-compute/src/arith.rs b/arro3-compute/src/arith.rs index c0a4c01..78f74d2 100644 --- a/arro3-compute/src/arith.rs +++ b/arro3-compute/src/arith.rs @@ -6,50 +6,72 @@ use pyo3_arrow::PyArray; #[pyfunction] pub fn add(py: Python, lhs: AnyDatum, rhs: AnyDatum) -> PyArrowResult { - Ok(PyArray::from_array_ref(numeric::add(&lhs, &rhs)?).to_arro3(py)?) + Ok(PyArray::from_array_ref(numeric::add(&lhs, &rhs)?) + .to_arro3(py)? + .unbind()) } #[pyfunction] pub fn add_wrapping(py: Python, lhs: AnyDatum, rhs: AnyDatum) -> PyArrowResult { - Ok(PyArray::from_array_ref(numeric::add_wrapping(&lhs, &rhs)?).to_arro3(py)?) + Ok(PyArray::from_array_ref(numeric::add_wrapping(&lhs, &rhs)?) + .to_arro3(py)? + .unbind()) } #[pyfunction] pub fn div(py: Python, lhs: AnyDatum, rhs: AnyDatum) -> PyArrowResult { - Ok(PyArray::from_array_ref(numeric::div(&lhs, &rhs)?).to_arro3(py)?) + Ok(PyArray::from_array_ref(numeric::div(&lhs, &rhs)?) + .to_arro3(py)? + .unbind()) } #[pyfunction] pub fn mul(py: Python, lhs: AnyDatum, rhs: AnyDatum) -> PyArrowResult { - Ok(PyArray::from_array_ref(numeric::mul(&lhs, &rhs)?).to_arro3(py)?) + Ok(PyArray::from_array_ref(numeric::mul(&lhs, &rhs)?) + .to_arro3(py)? + .unbind()) } #[pyfunction] pub fn mul_wrapping(py: Python, lhs: AnyDatum, rhs: AnyDatum) -> PyArrowResult { - Ok(PyArray::from_array_ref(numeric::mul_wrapping(&lhs, &rhs)?).to_arro3(py)?) + Ok(PyArray::from_array_ref(numeric::mul_wrapping(&lhs, &rhs)?) + .to_arro3(py)? + .unbind()) } #[pyfunction] pub fn neg(py: Python, array: PyArray) -> PyArrowResult { - Ok(PyArray::from_array_ref(numeric::neg(array.as_ref())?).to_arro3(py)?) + Ok(PyArray::from_array_ref(numeric::neg(array.as_ref())?) + .to_arro3(py)? + .unbind()) } #[pyfunction] pub fn neg_wrapping(py: Python, array: PyArray) -> PyArrowResult { - Ok(PyArray::from_array_ref(numeric::neg_wrapping(array.as_ref())?).to_arro3(py)?) + Ok( + PyArray::from_array_ref(numeric::neg_wrapping(array.as_ref())?) + .to_arro3(py)? + .unbind(), + ) } #[pyfunction] pub fn rem(py: Python, lhs: AnyDatum, rhs: AnyDatum) -> PyArrowResult { - Ok(PyArray::from_array_ref(numeric::rem(&lhs, &rhs)?).to_arro3(py)?) + Ok(PyArray::from_array_ref(numeric::rem(&lhs, &rhs)?) + .to_arro3(py)? + .unbind()) } #[pyfunction] pub fn sub(py: Python, lhs: AnyDatum, rhs: AnyDatum) -> PyArrowResult { - Ok(PyArray::from_array_ref(numeric::sub(&lhs, &rhs)?).to_arro3(py)?) + Ok(PyArray::from_array_ref(numeric::sub(&lhs, &rhs)?) + .to_arro3(py)? + .unbind()) } #[pyfunction] pub fn sub_wrapping(py: Python, lhs: AnyDatum, rhs: AnyDatum) -> PyArrowResult { - Ok(PyArray::from_array_ref(numeric::sub_wrapping(&lhs, &rhs)?).to_arro3(py)?) + Ok(PyArray::from_array_ref(numeric::sub_wrapping(&lhs, &rhs)?) + .to_arro3(py)? + .unbind()) } diff --git a/arro3-compute/src/boolean.rs b/arro3-compute/src/boolean.rs index 3cff6a1..ce7b349 100644 --- a/arro3-compute/src/boolean.rs +++ b/arro3-compute/src/boolean.rs @@ -13,7 +13,9 @@ pub fn is_null(py: Python, input: AnyArray) -> PyArrowResult { match input { AnyArray::Array(input) => { let out = arrow::compute::is_null(input.as_ref())?; - Ok(PyArray::from_array_ref(Arc::new(out)).to_arro3(py)?) + Ok(PyArray::from_array_ref(Arc::new(out)) + .to_arro3(py)? + .unbind()) } AnyArray::Stream(input) => { let input = input.into_reader()?; @@ -25,7 +27,8 @@ pub fn is_null(py: Python, input: AnyArray) -> PyArrowResult { }); Ok( PyArrayReader::new(Box::new(ArrayIterator::new(iter, out_field.into()))) - .to_arro3(py)?, + .to_arro3(py)? + .unbind(), ) } } @@ -36,7 +39,9 @@ pub fn is_not_null(py: Python, input: AnyArray) -> PyArrowResult { match input { AnyArray::Array(input) => { let out = arrow::compute::is_not_null(input.as_ref())?; - Ok(PyArray::from_array_ref(Arc::new(out)).to_arro3(py)?) + Ok(PyArray::from_array_ref(Arc::new(out)) + .to_arro3(py)? + .unbind()) } AnyArray::Stream(input) => { let input = input.into_reader()?; @@ -48,7 +53,8 @@ pub fn is_not_null(py: Python, input: AnyArray) -> PyArrowResult { }); Ok( PyArrayReader::new(Box::new(ArrayIterator::new(iter, out_field.into()))) - .to_arro3(py)?, + .to_arro3(py)? + .unbind(), ) } } diff --git a/arro3-compute/src/cast.rs b/arro3-compute/src/cast.rs index fdad3fa..7d728bf 100644 --- a/arro3-compute/src/cast.rs +++ b/arro3-compute/src/cast.rs @@ -17,7 +17,7 @@ pub fn cast(py: Python, input: AnyArray, to_type: PyField) -> PyArrowResult { let new_field = to_type.into_inner(); let out = arrow_cast::cast(arr.as_ref(), new_field.data_type())?; - Ok(PyArray::new(out, new_field).to_arro3(py)?) + Ok(PyArray::new(out, new_field).to_arro3(py)?.unbind()) } AnyArray::Stream(stream) => { let reader = stream.into_reader()?; @@ -36,7 +36,11 @@ pub fn cast(py: Python, input: AnyArray, to_type: PyField) -> PyArrowResult PyArrowResult { let (chunks, field) = input.into_inner(); let array_refs = chunks.iter().map(|arr| arr.as_ref()).collect::>(); let concatted = arrow_select::concat::concat(array_refs.as_slice())?; - Ok(PyArray::new(concatted, field).to_arro3(py)?) + Ok(PyArray::new(concatted, field).to_arro3(py)?.unbind()) } diff --git a/arro3-compute/src/dictionary.rs b/arro3-compute/src/dictionary.rs index 49fb273..11d2b9d 100644 --- a/arro3-compute/src/dictionary.rs +++ b/arro3-compute/src/dictionary.rs @@ -20,7 +20,7 @@ pub(crate) fn dictionary_encode(py: Python, array: AnyArray) -> PyArrowResult { let (array, _field) = array.into_inner(); let output_array = dictionary_encode_array(array)?; - Ok(PyArray::from_array_ref(output_array).to_arro3(py)?) + Ok(PyArray::from_array_ref(output_array).to_arro3(py)?.unbind()) } AnyArray::Stream(stream) => { let reader = stream.into_reader()?; @@ -37,7 +37,8 @@ pub(crate) fn dictionary_encode(py: Python, array: AnyArray) -> PyArrowResult PyArrowResul ))?; let filtered = arrow::compute::filter(values.as_ref(), predicate)?; - Ok(PyArray::new(filtered, values_field).to_arro3(py)?) + Ok(PyArray::new(filtered, values_field).to_arro3(py)?.unbind()) } (AnyArray::Stream(values), AnyArray::Stream(predicate)) => { let values = values.into_reader()?; @@ -47,7 +47,8 @@ pub fn filter(py: Python, values: AnyArray, predicate: AnyArray) -> PyArrowResul }); Ok( PyArrayReader::new(Box::new(ArrayIterator::new(iter, values_field))) - .to_arro3(py)?, + .to_arro3(py)? + .unbind(), ) } _ => Err(PyValueError::new_err("Unsupported combination of array and stream").into()), diff --git a/arro3-compute/src/take.rs b/arro3-compute/src/take.rs index 333ef8e..7336b11 100644 --- a/arro3-compute/src/take.rs +++ b/arro3-compute/src/take.rs @@ -8,5 +8,7 @@ use pyo3_arrow::PyArray; pub fn take(py: Python, values: PyArray, indices: PyArray) -> PyArrowResult { let output_array = py.allow_threads(|| arrow_select::take::take(values.as_ref(), indices.as_ref(), None))?; - Ok(PyArray::new(output_array, values.field().clone()).to_arro3(py)?) + Ok(PyArray::new(output_array, values.field().clone()) + .to_arro3(py)? + .unbind()) } diff --git a/arro3-compute/src/temporal.rs b/arro3-compute/src/temporal.rs index c02dbc1..0597885 100644 --- a/arro3-compute/src/temporal.rs +++ b/arro3-compute/src/temporal.rs @@ -86,7 +86,7 @@ pub fn date_part(py: Python, input: AnyArray, part: DatePart) -> PyArrowResult

{ let out = arrow::compute::date_part(input.as_ref(), part.into())?; - Ok(PyArray::from_array_ref(out).to_arro3(py)?) + Ok(PyArray::from_array_ref(out).to_arro3(py)?.unbind()) } AnyArray::Stream(stream) => { let reader = stream.into_reader()?; @@ -98,7 +98,8 @@ pub fn date_part(py: Python, input: AnyArray, part: DatePart) -> PyArrowResult

PyArrowResult

{ let (array, _field) = array.into_inner(); let output_array = _dictionary_indices(array)?; - Ok(PyArray::from_array_ref(output_array).to_arro3(py)?) + Ok(PyArray::from_array_ref(output_array).to_arro3(py)?.unbind()) } AnyArray::Stream(stream) => { let reader = stream.into_reader()?; @@ -34,7 +34,8 @@ pub(crate) fn dictionary_indices(py: Python, array: AnyArray) -> PyArrowResult

PyArrowResul AnyArray::Array(array) => { let (array, _field) = array.into_inner(); let output_array = _dictionary_dictionary(array)?; - Ok(PyArray::from_array_ref(output_array).to_arro3(py)?) + Ok(PyArray::from_array_ref(output_array).to_arro3(py)?.unbind()) } AnyArray::Stream(stream) => { let reader = stream.into_reader()?; @@ -70,7 +71,8 @@ pub(crate) fn dictionary_dictionary(py: Python, array: AnyArray) -> PyArrowResul .map(move |array| _dictionary_dictionary(array?)); Ok( PyArrayReader::new(Box::new(ArrayIterator::new(iter, out_field.into()))) - .to_arro3(py)?, + .to_arro3(py)? + .unbind(), ) } } diff --git a/arro3-core/src/accessors/list_flatten.rs b/arro3-core/src/accessors/list_flatten.rs index 0ebd24c..7af6afd 100644 --- a/arro3-core/src/accessors/list_flatten.rs +++ b/arro3-core/src/accessors/list_flatten.rs @@ -14,7 +14,7 @@ pub fn list_flatten(py: Python, input: AnyArray) -> PyArrowResult { let (array, field) = array.into_inner(); let flat_array = flatten_array(array)?; let flat_field = flatten_field(field)?; - Ok(PyArray::new(flat_array, flat_field).to_arro3(py)?) + Ok(PyArray::new(flat_array, flat_field).to_arro3(py)?.unbind()) } AnyArray::Stream(stream) => { let reader = stream.into_reader()?; @@ -26,7 +26,8 @@ pub fn list_flatten(py: Python, input: AnyArray) -> PyArrowResult { }); Ok( PyArrayReader::new(Box::new(ArrayIterator::new(iter, flatten_field))) - .to_arro3(py)?, + .to_arro3(py)? + .unbind(), ) } } diff --git a/arro3-core/src/accessors/list_offsets.rs b/arro3-core/src/accessors/list_offsets.rs index 08e81f9..38104b7 100644 --- a/arro3-core/src/accessors/list_offsets.rs +++ b/arro3-core/src/accessors/list_offsets.rs @@ -17,7 +17,7 @@ pub fn list_offsets(py: Python, input: AnyArray, logical: bool) -> PyArrowResult AnyArray::Array(array) => { let (array, _field) = array.into_inner(); let offsets = _list_offsets(array, logical)?; - Ok(PyArray::from_array_ref(offsets).to_arro3(py)?) + Ok(PyArray::from_array_ref(offsets).to_arro3(py)?.unbind()) } AnyArray::Stream(stream) => { let reader = stream.into_reader()?; @@ -36,7 +36,8 @@ pub fn list_offsets(py: Python, input: AnyArray, logical: bool) -> PyArrowResult .map(move |array| _list_offsets(array?, logical)); Ok( PyArrayReader::new(Box::new(ArrayIterator::new(iter, out_field.into()))) - .to_arro3(py)?, + .to_arro3(py)? + .unbind(), ) } } diff --git a/arro3-core/src/accessors/struct_field.rs b/arro3-core/src/accessors/struct_field.rs index f09f5eb..3546520 100644 --- a/arro3-core/src/accessors/struct_field.rs +++ b/arro3-core/src/accessors/struct_field.rs @@ -40,7 +40,8 @@ pub(crate) fn struct_field( array_ref.slice(orig_array.offset(), orig_array.len()), field_ref.clone(), ) - .to_arro3(py)?) + .to_arro3(py)? + .unbind()) } fn get_child(array: &ArrayRef, i: usize) -> Result<(&ArrayRef, &FieldRef), ArrowError> { diff --git a/arro3-core/src/constructors.rs b/arro3-core/src/constructors.rs index c156a85..7bc7757 100644 --- a/arro3-core/src/constructors.rs +++ b/arro3-core/src/constructors.rs @@ -9,16 +9,16 @@ use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3_arrow::error::PyArrowResult; +use pyo3_arrow::export::Arro3Array; use pyo3_arrow::{PyArray, PyField}; #[pyfunction] #[pyo3(signature=(values, list_size, *, r#type=None))] pub(crate) fn fixed_size_list_array( - py: Python, values: PyArray, list_size: i32, r#type: Option, -) -> PyArrowResult { +) -> PyArrowResult { let (values_array, values_field) = values.into_inner(); let output_field = r#type.map(|t| t.into_inner()).unwrap_or_else(|| { Arc::new(Field::new( @@ -36,17 +36,16 @@ pub(crate) fn fixed_size_list_array( } }; let array = FixedSizeListArray::try_new(inner_field.clone(), list_size, values_array, None)?; - Ok(PyArray::new(Arc::new(array), output_field).to_arro3(py)?) + Ok(PyArray::new(Arc::new(array), output_field).into()) } #[pyfunction] #[pyo3(signature=(offsets, values, *, r#type=None))] pub(crate) fn list_array( - py: Python, offsets: PyArray, values: PyArray, r#type: Option, -) -> PyArrowResult { +) -> PyArrowResult { let (values_array, values_field) = values.into_inner(); let (offsets_array, _) = offsets.into_inner(); let large_offsets = match offsets_array.data_type() { @@ -93,17 +92,16 @@ pub(crate) fn list_array( None, )?) }; - Ok(PyArray::new(Arc::new(list_array), output_field).to_arro3(py)?) + Ok(PyArray::new(Arc::new(list_array), output_field).into()) } #[pyfunction] #[pyo3(signature=(arrays, *, fields, r#type=None))] pub(crate) fn struct_array( - py: Python, arrays: Vec, fields: Vec, r#type: Option, -) -> PyArrowResult { +) -> PyArrowResult { let output_field = r#type.map(|t| t.into_inner()).unwrap_or_else(|| { let fields = fields .into_iter() @@ -125,5 +123,5 @@ pub(crate) fn struct_array( .collect::>(); let array = StructArray::try_new(inner_fields, arrays, None)?; - Ok(PyArray::new(Arc::new(array), output_field).to_arro3(py)?) + Ok(PyArray::new(Arc::new(array), output_field).into()) } diff --git a/arro3-io/src/csv.rs b/arro3-io/src/csv.rs index d4788f3..d0f77f0 100644 --- a/arro3-io/src/csv.rs +++ b/arro3-io/src/csv.rs @@ -4,6 +4,7 @@ use arrow_csv::reader::Format; use arrow_csv::{ReaderBuilder, WriterBuilder}; use pyo3::prelude::*; use pyo3_arrow::error::PyArrowResult; +use pyo3_arrow::export::{Arro3RecordBatchReader, Arro3Schema}; use pyo3_arrow::input::AnyRecordBatch; use pyo3_arrow::{PyRecordBatchReader, PySchema}; @@ -24,7 +25,6 @@ use crate::utils::{FileReader, FileWriter}; ))] #[allow(clippy::too_many_arguments)] pub fn infer_csv_schema( - py: Python, file: FileReader, has_header: Option, max_records: Option, @@ -33,7 +33,7 @@ pub fn infer_csv_schema( quote: Option, terminator: Option, comment: Option, -) -> PyArrowResult { +) -> PyArrowResult { let mut format = Format::default(); if let Some(has_header) = has_header { @@ -57,7 +57,7 @@ pub fn infer_csv_schema( let buf_file = BufReader::new(file); let (schema, _records_read) = format.infer_schema(buf_file, max_records)?; - Ok(PySchema::new(schema.into()).to_arro3(py)?) + Ok(schema.into()) } /// Read a CSV file to an Arrow RecordBatchReader @@ -76,7 +76,6 @@ pub fn infer_csv_schema( ))] #[allow(clippy::too_many_arguments)] pub fn read_csv( - py: Python, file: FileReader, schema: PySchema, has_header: Option, @@ -86,7 +85,7 @@ pub fn read_csv( quote: Option, terminator: Option, comment: Option, -) -> PyArrowResult { +) -> PyArrowResult { let mut builder = ReaderBuilder::new(schema.into()); if let Some(has_header) = has_header { @@ -112,7 +111,7 @@ pub fn read_csv( } let reader = builder.build(file)?; - Ok(PyRecordBatchReader::new(Box::new(reader)).to_arro3(py)?) + Ok(PyRecordBatchReader::new(Box::new(reader)).into()) } /// Write an Arrow Table or stream to a CSV file diff --git a/arro3-io/src/ipc.rs b/arro3-io/src/ipc.rs index b922f7a..0e143ad 100644 --- a/arro3-io/src/ipc.rs +++ b/arro3-io/src/ipc.rs @@ -5,6 +5,7 @@ use arrow_ipc::writer::IpcWriteOptions; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3_arrow::error::PyArrowResult; +use pyo3_arrow::export::Arro3RecordBatchReader; use pyo3_arrow::input::AnyRecordBatch; use pyo3_arrow::PyRecordBatchReader; @@ -12,18 +13,18 @@ use crate::utils::{FileReader, FileWriter}; /// Read an Arrow IPC file to an Arrow RecordBatchReader #[pyfunction] -pub fn read_ipc(py: Python, file: FileReader) -> PyArrowResult { +pub fn read_ipc(file: FileReader) -> PyArrowResult { let builder = FileReaderBuilder::new(); let buf_file = BufReader::new(file); let reader = builder.build(buf_file)?; - Ok(PyRecordBatchReader::new(Box::new(reader)).to_arro3(py)?) + Ok(PyRecordBatchReader::new(Box::new(reader)).into()) } /// Read an Arrow IPC Stream file to an Arrow RecordBatchReader #[pyfunction] -pub fn read_ipc_stream(py: Python, file: FileReader) -> PyArrowResult { +pub fn read_ipc_stream(file: FileReader) -> PyArrowResult { let reader = StreamReader::try_new(file, None)?; - Ok(PyRecordBatchReader::new(Box::new(reader)).to_arro3(py)?) + Ok(PyRecordBatchReader::new(Box::new(reader)).into()) } #[allow(clippy::upper_case_acronyms)] diff --git a/arro3-io/src/json.rs b/arro3-io/src/json.rs index 4128850..f32abd3 100644 --- a/arro3-io/src/json.rs +++ b/arro3-io/src/json.rs @@ -4,6 +4,7 @@ use arrow::json::writer::{JsonArray, LineDelimited}; use arrow::json::{ReaderBuilder, WriterBuilder}; use pyo3::prelude::*; use pyo3_arrow::error::PyArrowResult; +use pyo3_arrow::export::{Arro3RecordBatchReader, Arro3Schema}; use pyo3_arrow::input::AnyRecordBatch; use pyo3_arrow::{PyRecordBatchReader, PySchema}; @@ -17,13 +18,12 @@ use crate::utils::{FileReader, FileWriter}; max_records=None, ))] pub fn infer_json_schema( - py: Python, file: FileReader, max_records: Option, -) -> PyArrowResult { +) -> PyArrowResult { let buf_file = BufReader::new(file); let (schema, _records_read) = arrow::json::reader::infer_json_schema(buf_file, max_records)?; - Ok(PySchema::new(schema.into()).to_arro3(py)?) + Ok(schema.into()) } /// Read a JSON file to an Arrow RecordBatchReader @@ -35,11 +35,10 @@ pub fn infer_json_schema( batch_size=None, ))] pub fn read_json( - py: Python, file: FileReader, schema: PySchema, batch_size: Option, -) -> PyArrowResult { +) -> PyArrowResult { let mut builder = ReaderBuilder::new(schema.into()); if let Some(batch_size) = batch_size { @@ -48,7 +47,7 @@ pub fn read_json( let buf_file = BufReader::new(file); let reader = builder.build(buf_file)?; - Ok(PyRecordBatchReader::new(Box::new(reader)).to_arro3(py)?) + Ok(PyRecordBatchReader::new(Box::new(reader)).into()) } /// Write an Arrow Table or stream to a JSON file diff --git a/arro3-io/src/parquet.rs b/arro3-io/src/parquet.rs index 09d7fbd..3020637 100644 --- a/arro3-io/src/parquet.rs +++ b/arro3-io/src/parquet.rs @@ -14,6 +14,7 @@ use parquet::schema::types::ColumnPath; use pyo3::exceptions::{PyTypeError, PyValueError}; use pyo3::prelude::*; use pyo3_arrow::error::PyArrowResult; +use pyo3_arrow::export::Arro3RecordBatchReader; use pyo3_arrow::input::AnyRecordBatch; use pyo3_arrow::{PyRecordBatchReader, PyTable}; use pyo3_object_store::PyObjectStore; @@ -22,7 +23,7 @@ use crate::error::Arro3IoResult; use crate::utils::{FileReader, FileWriter}; #[pyfunction] -pub fn read_parquet(py: Python, file: FileReader) -> PyArrowResult { +pub fn read_parquet(file: FileReader) -> PyArrowResult { let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); let metadata = builder.schema().metadata().clone(); @@ -40,7 +41,7 @@ pub fn read_parquet(py: Python, file: FileReader) -> PyArrowResult { // https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ParquetRecordBatchReader.html#method.schema // https://github.com/apache/arrow-rs/pull/5135 let iter = Box::new(RecordBatchIterator::new(reader, arrow_schema)); - Ok(PyRecordBatchReader::new(iter).to_arro3(py)?) + Ok(PyRecordBatchReader::new(iter).into()) } #[pyfunction] diff --git a/pyo3-arrow/src/array.rs b/pyo3-arrow/src/array.rs index 0a230f5..0d16c08 100644 --- a/pyo3-arrow/src/array.rs +++ b/pyo3-arrow/src/array.rs @@ -20,6 +20,7 @@ use pyo3::{intern, IntoPyObjectExt}; #[cfg(feature = "buffer_protocol")] use crate::buffer::AnyBufferProtocol; use crate::error::PyArrowResult; +use crate::export::{Arro3Array, Arro3DataType, Arro3Field}; use crate::ffi::from_python::utils::import_array_pycapsules; use crate::ffi::to_python::nanoarrow::to_nanoarrow_array; use crate::ffi::{to_array_pycapsules, to_schema_pycapsule}; @@ -38,6 +39,7 @@ use crate::{PyDataType, PyField}; /// Field](https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html#arrow_c_array__). /// In particular, storing a [FieldRef] is required to persist Arrow extension metadata through the /// C Data Interface. +#[derive(Debug)] #[pyclass(module = "arro3.core._core", name = "Array", subclass)] pub struct PyArray { array: ArrayRef, @@ -98,29 +100,28 @@ impl PyArray { /// Export to an arro3.core.Array. /// /// This requires that you depend on arro3-core from your Python package. - pub fn to_arro3(&self, py: Python) -> PyResult { + pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult> { let arro3_mod = py.import(intern!(py, "arro3.core"))?; - let core_obj = arro3_mod.getattr(intern!(py, "Array"))?.call_method1( + arro3_mod.getattr(intern!(py, "Array"))?.call_method1( intern!(py, "from_arrow_pycapsule"), self.__arrow_c_array__(py, None)?, - )?; - core_obj.into_py_any(py) + ) } /// Export this to a Python `nanoarrow.Array`. - pub fn to_nanoarrow(&self, py: Python) -> PyResult { + pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult> { to_nanoarrow_array(py, &self.__arrow_c_array__(py, None)?) } /// Export to a pyarrow.Array /// /// Requires pyarrow >=14 - pub fn to_pyarrow(self, py: Python) -> PyResult { + pub fn to_pyarrow<'py>(&'py self, py: Python<'py>) -> PyResult> { let pyarrow_mod = py.import(intern!(py, "pyarrow"))?; - let pyarrow_obj = pyarrow_mod + let cloned = Self::new(self.array.clone(), self.field.clone()); + pyarrow_mod .getattr(intern!(py, "array"))? - .call1(PyTuple::new(py, vec![self.into_pyobject(py)?])?)?; - pyarrow_obj.into_py_any(py) + .call1(PyTuple::new(py, vec![cloned.into_pyobject(py)?])?) } } @@ -240,12 +241,12 @@ impl PyArray { #[pyo3(signature = (dtype=None, copy=None))] #[allow(unused_variables)] - fn __array__( - &self, - py: Python, - dtype: Option, - copy: Option, - ) -> PyResult { + fn __array__<'py>( + &'py self, + py: Python<'py>, + dtype: Option>, + copy: Option>, + ) -> PyResult> { to_numpy(py, &self.array) } @@ -255,7 +256,7 @@ impl PyArray { &'py self, py: Python<'py>, requested_schema: Option>, - ) -> PyArrowResult> { + ) -> PyArrowResult> { to_array_pycapsules(py, self.field.clone(), &self.array, requested_schema) } @@ -345,16 +346,16 @@ impl PyArray { Ok(Self::from_array_ref(arrow_array)) } - fn cast(&self, py: Python, target_type: PyField) -> PyArrowResult { + fn cast(&self, target_type: PyField) -> PyArrowResult { let new_field = target_type.into_inner(); let new_array = arrow::compute::cast(self.as_ref(), new_field.data_type())?; - Ok(PyArray::new(new_array, new_field).to_arro3(py)?) + Ok(PyArray::new(new_array, new_field).into()) } #[getter] #[pyo3(name = "field")] - fn py_field(&self, py: Python) -> PyResult { - PyField::new(self.field.clone()).to_arro3(py) + fn py_field(&self) -> Arro3Field { + PyField::new(self.field.clone()).into() } #[getter] @@ -368,18 +369,18 @@ impl PyArray { } #[pyo3(signature = (offset=0, length=None))] - fn slice(&self, py: Python, offset: usize, length: Option) -> PyResult { + fn slice(&self, offset: usize, length: Option) -> Arro3Array { let length = length.unwrap_or_else(|| self.array.len() - offset); let new_array = self.array.slice(offset, length); - PyArray::new(new_array, self.field().clone()).to_arro3(py) + PyArray::new(new_array, self.field().clone()).into() } - fn take(&self, py: Python, indices: PyArray) -> PyArrowResult { + fn take(&self, indices: PyArray) -> PyArrowResult { let new_array = arrow::compute::take(self.as_ref(), indices.as_ref(), None)?; - Ok(PyArray::new(new_array, self.field.clone()).to_arro3(py)?) + Ok(PyArray::new(new_array, self.field.clone()).into()) } - fn to_numpy(&self, py: Python) -> PyResult { + fn to_numpy<'py>(&'py self, py: Python<'py>) -> PyResult> { self.__array__(py, None, None) } @@ -394,7 +395,7 @@ impl PyArray { } #[getter] - fn r#type(&self, py: Python) -> PyResult { - PyDataType::new(self.field.data_type().clone()).to_arro3(py) + fn r#type(&self) -> Arro3DataType { + PyDataType::new(self.field.data_type().clone()).into() } } diff --git a/pyo3-arrow/src/array_reader.rs b/pyo3-arrow/src/array_reader.rs index 3a7a517..386a5ee 100644 --- a/pyo3-arrow/src/array_reader.rs +++ b/pyo3-arrow/src/array_reader.rs @@ -3,11 +3,12 @@ use std::sync::Mutex; use arrow_schema::FieldRef; use pyo3::exceptions::{PyIOError, PyStopIteration, PyValueError}; +use pyo3::intern; use pyo3::prelude::*; use pyo3::types::{PyCapsule, PyTuple, PyType}; -use pyo3::{intern, IntoPyObjectExt}; use crate::error::PyArrowResult; +use crate::export::{Arro3Array, Arro3ChunkedArray, Arro3Field}; use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader; use crate::ffi::from_python::utils::import_stream_pycapsule; use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream; @@ -78,20 +79,17 @@ impl PyArrayReader { /// Export this to a Python `arro3.core.ArrayReader`. #[allow(clippy::wrong_self_convention)] - pub fn to_arro3(&mut self, py: Python) -> PyResult { + pub fn to_arro3<'py>(&'py mut self, py: Python<'py>) -> PyResult> { let arro3_mod = py.import(intern!(py, "arro3.core"))?; - let core_obj = arro3_mod - .getattr(intern!(py, "ArrayReader"))? - .call_method1( - intern!(py, "from_arrow_pycapsule"), - PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?, - )?; - core_obj.into_py_any(py) + arro3_mod.getattr(intern!(py, "ArrayReader"))?.call_method1( + intern!(py, "from_arrow_pycapsule"), + PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?, + ) } /// Export this to a Python `nanoarrow.ArrayStream`. #[allow(clippy::wrong_self_convention)] - pub fn to_nanoarrow(&mut self, py: Python) -> PyResult { + pub fn to_nanoarrow<'py>(&'py mut self, py: Python<'py>) -> PyResult> { to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?) } } @@ -138,12 +136,12 @@ impl PyArrayReader { // Return self // https://stackoverflow.com/a/52056290 - fn __iter__(&mut self, py: Python) -> PyResult { + fn __iter__<'py>(&'py mut self, py: Python<'py>) -> PyResult> { self.to_arro3(py) } - fn __next__(&mut self, py: Python) -> PyArrowResult { - self.read_next_array(py) + fn __next__(&mut self) -> PyArrowResult { + self.read_next_array() } fn __repr__(&self) -> String { @@ -188,11 +186,11 @@ impl PyArrayReader { } #[getter] - fn field(&self, py: Python) -> PyResult { - PyField::new(self.field_ref()?).to_arro3(py) + fn field(&self) -> PyResult { + Ok(PyField::new(self.field_ref()?).into()) } - fn read_all(&mut self, py: Python) -> PyArrowResult { + fn read_all(&mut self) -> PyArrowResult { let stream = self .0 .lock() @@ -204,17 +202,17 @@ impl PyArrayReader { for array in stream { arrays.push(array?); } - Ok(PyChunkedArray::try_new(arrays, field)?.to_arro3(py)?) + Ok(PyChunkedArray::try_new(arrays, field)?.into()) } - fn read_next_array(&mut self, py: Python) -> PyArrowResult { + fn read_next_array(&mut self) -> PyArrowResult { let mut inner = self.0.lock().unwrap(); let stream = inner .as_mut() .ok_or(PyIOError::new_err("Cannot read from closed stream."))?; if let Some(next_batch) = stream.next() { - Ok(PyArray::new(next_batch?, stream.field()).to_arro3(py)?) + Ok(PyArray::new(next_batch?, stream.field()).into()) } else { Err(PyStopIteration::new_err("").into()) } diff --git a/pyo3-arrow/src/buffer.rs b/pyo3-arrow/src/buffer.rs index ad2e7c0..7e6a4f4 100644 --- a/pyo3-arrow/src/buffer.rs +++ b/pyo3-arrow/src/buffer.rs @@ -261,7 +261,6 @@ impl AnyBufferProtocol { /// - This assumes that the Python buffer is immutable. Immutability is not guaranteed by the /// Python buffer protocol, so the end user must uphold this. Mutating a Python buffer could /// lead to undefined behavior. - // Note: in the future, maybe you should check item alignment as well? // https://github.com/PyO3/pyo3/blob/ce18f79d71f4d3eac54f55f7633cf08d2f57b64e/src/buffer.rs#L217-L221 pub fn into_arrow_array(self) -> PyArrowResult { diff --git a/pyo3-arrow/src/chunked.rs b/pyo3-arrow/src/chunked.rs index 3a58d62..60d692d 100644 --- a/pyo3-arrow/src/chunked.rs +++ b/pyo3-arrow/src/chunked.rs @@ -10,6 +10,7 @@ use pyo3::types::{PyCapsule, PyTuple, PyType}; use pyo3::{intern, IntoPyObjectExt}; use crate::error::{PyArrowError, PyArrowResult}; +use crate::export::{Arro3Array, Arro3ChunkedArray, Arro3DataType, Arro3Field}; use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader; use crate::ffi::from_python::utils::import_stream_pycapsule; use crate::ffi::to_python::chunked::ArrayIterator; @@ -23,6 +24,7 @@ use crate::{PyArray, PyDataType, PyField, PyScalar}; /// A Python-facing Arrow chunked array. /// /// This is a wrapper around a [FieldRef] and a `Vec` of [ArrayRef]. +#[derive(Debug)] #[pyclass(module = "arro3.core._core", name = "ChunkedArray", subclass)] pub struct PyChunkedArray { chunks: Vec, @@ -185,19 +187,18 @@ impl PyChunkedArray { } /// Export this to a Python `arro3.core.ChunkedArray`. - pub fn to_arro3(&self, py: Python) -> PyResult { + pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult> { let arro3_mod = py.import(intern!(py, "arro3.core"))?; - let core_obj = arro3_mod + arro3_mod .getattr(intern!(py, "ChunkedArray"))? .call_method1( intern!(py, "from_arrow_pycapsule"), PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?, - )?; - core_obj.into_py_any(py) + ) } /// Export this to a Python `nanoarrow.ArrayStream`. - pub fn to_nanoarrow(&self, py: Python) -> PyResult { + pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult> { to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?) } @@ -211,6 +212,16 @@ impl PyChunkedArray { .call1(PyTuple::new(py, vec![self.into_pyobject(py)?])?)?; pyarrow_obj.into_py_any(py) } + + pub(crate) fn to_stream_pycapsule<'py>( + py: Python<'py>, + chunks: Vec, + field: FieldRef, + requested_schema: Option>, + ) -> PyArrowResult> { + let array_reader = Box::new(ArrayIterator::new(chunks.into_iter().map(Ok), field)); + to_stream_pycapsule(py, array_reader, requested_schema) + } } impl TryFrom> for PyChunkedArray { @@ -277,18 +288,18 @@ impl PyChunkedArray { #[pyo3(signature = (dtype=None, copy=None))] #[allow(unused_variables)] - fn __array__( - &self, - py: Python, + fn __array__<'py>( + &'py self, + py: Python<'py>, dtype: Option, copy: Option, - ) -> PyResult { + ) -> PyResult> { let chunk_refs = self .chunks .iter() .map(|arr| arr.as_ref()) .collect::>(); - chunked_to_numpy(py, chunk_refs.as_slice()) + chunked_to_numpy(py, chunk_refs) } fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult> { @@ -302,11 +313,12 @@ impl PyChunkedArray { py: Python<'py>, requested_schema: Option>, ) -> PyArrowResult> { - let array_reader = Box::new(ArrayIterator::new( - self.chunks.clone().into_iter().map(Ok), + Self::to_stream_pycapsule( + py, + self.chunks.clone(), self.field.clone(), - )); - to_stream_pycapsule(py, array_reader, requested_schema) + requested_schema, + ) } fn __eq__(&self, other: &PyChunkedArray) -> bool { @@ -355,40 +367,40 @@ impl PyChunkedArray { Self::from_arrow_pycapsule(capsule) } - fn cast(&self, py: Python, target_type: PyField) -> PyArrowResult { + fn cast(&self, target_type: PyField) -> PyArrowResult { let new_field = target_type.into_inner(); let new_chunks = self .chunks .iter() .map(|chunk| arrow::compute::cast(&chunk, new_field.data_type())) .collect::, ArrowError>>()?; - Ok(PyChunkedArray::try_new(new_chunks, new_field)?.to_arro3(py)?) + Ok(PyChunkedArray::try_new(new_chunks, new_field)?.into()) } - fn chunk(&self, py: Python, i: usize) -> PyResult { + fn chunk(&self, i: usize) -> PyResult { let field = self.field().clone(); let array = self .chunks .get(i) .ok_or(PyValueError::new_err("out of index"))? .clone(); - PyArray::new(array, field).to_arro3(py) + Ok(PyArray::new(array, field).into()) } #[getter] #[pyo3(name = "chunks")] - fn chunks_py(&self, py: Python) -> PyResult> { + fn chunks_py(&self) -> Vec { let field = self.field().clone(); self.chunks .iter() - .map(|array| PyArray::new(array.clone(), field.clone()).to_arro3(py)) + .map(|array| PyArray::new(array.clone(), field.clone()).into()) .collect() } - fn combine_chunks(&self, py: Python) -> PyArrowResult { + fn combine_chunks(&self) -> PyArrowResult { let field = self.field().clone(); let arrays: Vec<&dyn Array> = self.chunks.iter().map(|arr| arr.as_ref()).collect(); - Ok(PyArray::new(concat(&arrays)?, field).to_arro3(py)?) + Ok(PyArray::new(concat(&arrays)?, field).into()) } fn equals(&self, other: PyChunkedArray) -> bool { @@ -397,8 +409,8 @@ impl PyChunkedArray { #[getter] #[pyo3(name = "field")] - fn py_field(&self, py: Python) -> PyResult { - PyField::new(self.field.clone()).to_arro3(py) + fn py_field(&self) -> Arro3Field { + PyField::new(self.field.clone()).into() } fn length(&self) -> usize { @@ -426,7 +438,7 @@ impl PyChunkedArray { #[pyo3(signature = (*, max_chunksize=None))] #[pyo3(name = "rechunk")] - fn rechunk_py(&self, py: Python, max_chunksize: Option) -> PyArrowResult { + fn rechunk_py(&self, max_chunksize: Option) -> PyArrowResult { let max_chunksize = max_chunksize.unwrap_or(self.len()); let mut chunk_lengths = vec![]; let mut offset = 0; @@ -435,23 +447,17 @@ impl PyChunkedArray { offset += chunk_length; chunk_lengths.push(chunk_length); } - Ok(self.rechunk(chunk_lengths)?.to_arro3(py)?) + Ok(self.rechunk(chunk_lengths)?.into()) } #[pyo3(signature = (offset=0, length=None))] #[pyo3(name = "slice")] - fn slice_py( - &self, - py: Python, - offset: usize, - length: Option, - ) -> PyArrowResult { + fn slice_py(&self, offset: usize, length: Option) -> PyArrowResult { let length = length.unwrap_or_else(|| self.len() - offset); - let sliced_chunked_array = self.slice(offset, length)?; - Ok(sliced_chunked_array.to_arro3(py)?) + Ok(self.slice(offset, length)?.into()) } - fn to_numpy(&self, py: Python) -> PyResult { + fn to_numpy<'py>(&'py self, py: Python<'py>) -> PyResult> { self.__array__(py, None, None) } @@ -468,7 +474,7 @@ impl PyChunkedArray { } #[getter] - fn r#type(&self, py: Python) -> PyResult { - PyDataType::new(self.field.data_type().clone()).to_arro3(py) + fn r#type(&self) -> Arro3DataType { + PyDataType::new(self.field.data_type().clone()).into() } } diff --git a/pyo3-arrow/src/datatypes.rs b/pyo3-arrow/src/datatypes.rs index 49e8d72..f8ade06 100644 --- a/pyo3-arrow/src/datatypes.rs +++ b/pyo3-arrow/src/datatypes.rs @@ -4,11 +4,12 @@ use std::sync::Arc; use arrow::datatypes::DataType; use arrow_schema::{Field, IntervalUnit, TimeUnit}; use pyo3::exceptions::{PyTypeError, PyValueError}; +use pyo3::intern; use pyo3::prelude::*; use pyo3::types::{PyCapsule, PyTuple, PyType}; -use pyo3::{intern, IntoPyObjectExt}; use crate::error::PyArrowResult; +use crate::export::Arro3DataType; use crate::ffi::from_python::utils::import_schema_pycapsule; use crate::ffi::to_python::nanoarrow::to_nanoarrow_schema; use crate::ffi::to_schema_pycapsule; @@ -53,22 +54,21 @@ impl PyDataType { self.0 } - /// Export this to a Python `arro3.core.Field`. - pub fn to_arro3(&self, py: Python) -> PyResult { + /// Export this to a Python `arro3.core.DataType`. + pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult> { let arro3_mod = py.import(intern!(py, "arro3.core"))?; - let core_obj = arro3_mod.getattr(intern!(py, "DataType"))?.call_method1( + arro3_mod.getattr(intern!(py, "DataType"))?.call_method1( intern!(py, "from_arrow_pycapsule"), PyTuple::new(py, vec![self.__arrow_c_schema__(py)?])?, - )?; - core_obj.into_py_any(py) + ) } /// Export this to a Python `nanoarrow.Schema`. - pub fn to_nanoarrow(&self, py: Python) -> PyResult { + pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult> { to_nanoarrow_schema(py, &self.__arrow_c_schema__(py)?) } - /// Export to a pyarrow.Field + /// Export to a pyarrow.DataType /// /// Requires pyarrow >=14 pub fn to_pyarrow(self, py: Python) -> PyResult { @@ -120,7 +120,10 @@ impl Display for PyDataType { #[allow(non_snake_case)] #[pymethods] impl PyDataType { - fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult> { + pub(crate) fn __arrow_c_schema__<'py>( + &'py self, + py: Python<'py>, + ) -> PyArrowResult> { to_schema_pycapsule(py, &self.0) } @@ -235,20 +238,20 @@ impl PyDataType { } #[getter] - fn value_type(&self, py: Python) -> PyResult> { + fn value_type(&self) -> Option { match &self.0 { DataType::FixedSizeList(value_field, _) | DataType::List(value_field) | DataType::LargeList(value_field) | DataType::ListView(value_field) | DataType::LargeListView(value_field) - | DataType::RunEndEncoded(_, value_field) => Ok(Some( - PyDataType::new(value_field.data_type().clone()).to_arro3(py)?, - )), + | DataType::RunEndEncoded(_, value_field) => { + Some(PyDataType::new(value_field.data_type().clone()).into()) + } DataType::Dictionary(_key_type, value_type) => { - Ok(Some(PyDataType::new(*value_type.clone()).to_arro3(py)?)) + Some(PyDataType::new(*value_type.clone()).into()) } - _ => Ok(None), + _ => None, } } diff --git a/pyo3-arrow/src/export.rs b/pyo3-arrow/src/export.rs new file mode 100644 index 0000000..110e150 --- /dev/null +++ b/pyo3-arrow/src/export.rs @@ -0,0 +1,349 @@ +//! Wrappers around objects defined in this crate to simplify returning data to `arro3-core`. +//! +//! By default, if you return something like a `PyArray` from your Python function, it will work +//! because `PyArray` implements `#[pyclass]`, but it will statically link the private methods +//! defined on `PyArray` in your given version of `pyo3-arrow`. +//! +//! This isn't ideal for a few reasons. For one, this means that the actual classes returned from +//! multiple packages will be _different_. This also means that any updates in the latest `arro3` +//! version won't be reflected in your exported classes. +//! +//! Instead, because Arrow is an ABI-stable format, it's easy to _dynamically_ link the data. So we +//! can pass Arrow data at runtime to whatever version of `arro3-core` the user has in their Python +//! environment. +//! +//! Because each of the objects in this module implements `[IntoPyObject]`, you can return these +//! objects directly. +//! +//! ```notest +//! /// A function that will automatically return +//! #[pyfunction] +//! fn my_function() -> pyo3_arrow::export::Arro3Array { +//! todo!() +//! } +//! ``` +//! +//! Note that this means you must require `arro3-core` as a Python dependency in the +//! `pyproject.toml` of your Rust-Python library. + +use std::sync::Arc; + +use arrow_array::{RecordBatch, RecordBatchReader}; +use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef}; +use pyo3::intern; +use pyo3::prelude::*; +use pyo3::types::PyTuple; + +use crate::ffi::{to_array_pycapsules, to_schema_pycapsule}; +use crate::{ + PyArray, PyChunkedArray, PyDataType, PyField, PyRecordBatch, PyRecordBatchReader, PyScalar, + PySchema, PyTable, +}; + +/// A wrapper around a [PyArray] that implements [IntoPyObject] to convert to a runtime-available +/// `arro3.core.Array`. +/// +/// This ensures that we return data with the **user's** runtime-provided (dynamically-linked) +/// `arro3.core.Array` and not the one statically linked from Rust. +#[derive(Debug)] +pub struct Arro3Array(PyArray); + +impl From for Arro3Array { + fn from(value: PyArray) -> Self { + Self(value) + } +} + +impl<'py> IntoPyObject<'py> for Arro3Array { + type Target = PyAny; + type Output = Bound<'py, PyAny>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let arro3_mod = py.import(intern!(py, "arro3.core"))?; + arro3_mod.getattr(intern!(py, "Array"))?.call_method1( + intern!(py, "from_arrow_pycapsule"), + to_array_pycapsules(py, self.0.field().clone(), &self.0.array(), None)?, + ) + } +} + +/// A wrapper around a [PyChunkedArray] that implements [IntoPyObject] to convert to a +/// runtime-available `arro3.core.ChunkedArray`. +/// +/// This ensures that we return data with the **user's** runtime-provided (dynamically-linked) +/// `arro3.core.ChunkedArray` and not the one statically linked from Rust. +#[derive(Debug)] +pub struct Arro3ChunkedArray(PyChunkedArray); + +impl From for Arro3ChunkedArray { + fn from(value: PyChunkedArray) -> Self { + Self(value) + } +} + +impl<'py> IntoPyObject<'py> for Arro3ChunkedArray { + type Target = PyAny; + type Output = Bound<'py, PyAny>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let capsule = PyChunkedArray::to_stream_pycapsule( + py, + self.0.chunks().to_vec(), + self.0.field().clone(), + None, + )?; + + let arro3_mod = py.import(intern!(py, "arro3.core"))?; + arro3_mod + .getattr(intern!(py, "ChunkedArray"))? + .call_method1( + intern!(py, "from_arrow_pycapsule"), + PyTuple::new(py, vec![capsule])?, + ) + } +} + +/// A wrapper around a [PyField] that implements [IntoPyObject] to convert to a runtime-available +/// `arro3.core.Field`. +/// +/// This ensures that we return data with the **user's** runtime-provided (dynamically-linked) +/// `arro3.core.Field` and not the one statically linked from Rust. +#[derive(Debug)] +pub struct Arro3Field(PyField); + +impl From for Arro3Field { + fn from(value: PyField) -> Self { + Self(value) + } +} + +impl From for Arro3Field { + fn from(value: FieldRef) -> Self { + Self(value.into()) + } +} + +impl From<&Field> for Arro3Field { + fn from(value: &Field) -> Self { + Self(Arc::new(value.clone()).into()) + } +} + +impl<'py> IntoPyObject<'py> for Arro3Field { + type Target = PyAny; + type Output = Bound<'py, PyAny>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let arro3_mod = py.import(intern!(py, "arro3.core"))?; + arro3_mod.getattr(intern!(py, "Field"))?.call_method1( + intern!(py, "from_arrow_pycapsule"), + PyTuple::new(py, vec![to_schema_pycapsule(py, self.0.as_ref())?])?, + ) + } +} + +/// A wrapper around a [PyDataType] that implements [IntoPyObject] to convert to a +/// runtime-available `arro3.core.DataType`. +/// +/// This ensures that we return data with the **user's** runtime-provided (dynamically-linked) +/// `arro3.core.DataType` and not the one statically linked from Rust. +#[derive(Debug)] +pub struct Arro3DataType(PyDataType); + +impl From for Arro3DataType { + fn from(value: PyDataType) -> Self { + Self(value) + } +} + +impl From for Arro3DataType { + fn from(value: DataType) -> Self { + Self(PyDataType::new(value)) + } +} + +impl<'py> IntoPyObject<'py> for Arro3DataType { + type Target = PyAny; + type Output = Bound<'py, PyAny>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let arro3_mod = py.import(intern!(py, "arro3.core"))?; + arro3_mod.getattr(intern!(py, "DataType"))?.call_method1( + intern!(py, "from_arrow_pycapsule"), + PyTuple::new(py, vec![to_schema_pycapsule(py, self.0.as_ref())?])?, + ) + } +} + +/// A wrapper around a [PyRecordBatch] that implements [IntoPyObject] to convert to a +/// runtime-available `arro3.core.RecordBatch`. +/// +/// This ensures that we return data with the **user's** runtime-provided (dynamically-linked) +/// `arro3.core.RecordBatch` and not the one statically linked from Rust. +#[derive(Debug)] +pub struct Arro3RecordBatch(PyRecordBatch); + +impl From for Arro3RecordBatch { + fn from(value: PyRecordBatch) -> Self { + Self(value) + } +} + +impl From for Arro3RecordBatch { + fn from(value: RecordBatch) -> Self { + Self(value.into()) + } +} + +impl<'py> IntoPyObject<'py> for Arro3RecordBatch { + type Target = PyAny; + type Output = Bound<'py, PyAny>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let arro3_mod = py.import(intern!(py, "arro3.core"))?; + let capsules = PyRecordBatch::to_array_pycapsules(py, self.0.into_inner(), None)?; + arro3_mod + .getattr(intern!(py, "RecordBatch"))? + .call_method1(intern!(py, "from_arrow_pycapsule"), capsules) + } +} + +/// A wrapper around a [PyRecordBatchReader] that implements [IntoPyObject] to convert to a +/// runtime-available `arro3.core.RecordBatchReader`. +/// +/// This ensures that we return data with the **user's** runtime-provided (dynamically-linked) +/// `arro3.core.RecordBatchReader` and not the one statically linked from Rust. +pub struct Arro3RecordBatchReader(PyRecordBatchReader); + +impl From for Arro3RecordBatchReader { + fn from(value: PyRecordBatchReader) -> Self { + Self(value) + } +} + +impl From> for Arro3RecordBatchReader { + fn from(value: Box) -> Self { + Self(PyRecordBatchReader::new(value)) + } +} + +impl<'py> IntoPyObject<'py> for Arro3RecordBatchReader { + type Target = PyAny; + type Output = Bound<'py, PyAny>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let arro3_mod = py.import(intern!(py, "arro3.core"))?; + let capsule = PyRecordBatchReader::to_stream_pycapsule(py, self.0.into_reader()?, None)?; + arro3_mod + .getattr(intern!(py, "RecordBatchReader"))? + .call_method1( + intern!(py, "from_arrow_pycapsule"), + PyTuple::new(py, vec![capsule])?, + ) + } +} + +/// A wrapper around a [PyScalar] that implements [IntoPyObject] to convert to a +/// runtime-available `arro3.core.Scalar`. +/// +/// This ensures that we return data with the **user's** runtime-provided (dynamically-linked) +/// `arro3.core.Scalar` and not the one statically linked from Rust. +#[derive(Debug)] +pub struct Arro3Scalar(PyScalar); + +impl From for Arro3Scalar { + fn from(value: PyScalar) -> Self { + Self(value) + } +} + +impl<'py> IntoPyObject<'py> for Arro3Scalar { + type Target = PyAny; + type Output = Bound<'py, PyAny>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let capsules = to_array_pycapsules(py, self.0.field().clone(), &self.0.array(), None)?; + + let arro3_mod = py.import(intern!(py, "arro3.core"))?; + arro3_mod + .getattr(intern!(py, "Scalar"))? + .call_method1(intern!(py, "from_arrow_pycapsule"), capsules) + } +} + +/// A wrapper around a [PySchema] that implements [IntoPyObject] to convert to a +/// runtime-available `arro3.core.Schema`. +/// +/// This ensures that we return data with the **user's** runtime-provided (dynamically-linked) +/// `arro3.core.Schema` and not the one statically linked from Rust. +#[derive(Debug)] +pub struct Arro3Schema(PySchema); + +impl From for Arro3Schema { + fn from(value: PySchema) -> Self { + Self(value) + } +} + +impl From for Arro3Schema { + fn from(value: SchemaRef) -> Self { + Self(PySchema::new(value)) + } +} + +impl From for Arro3Schema { + fn from(value: Schema) -> Self { + Self(PySchema::new(Arc::new(value))) + } +} + +impl<'py> IntoPyObject<'py> for Arro3Schema { + type Target = PyAny; + type Output = Bound<'py, PyAny>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let arro3_mod = py.import(intern!(py, "arro3.core"))?; + arro3_mod.getattr(intern!(py, "Schema"))?.call_method1( + intern!(py, "from_arrow_pycapsule"), + PyTuple::new(py, vec![to_schema_pycapsule(py, self.0.as_ref())?])?, + ) + } +} + +/// A wrapper around a [PyTable] that implements [IntoPyObject] to convert to a +/// runtime-available `arro3.core.Table`. +/// +/// This ensures that we return data with the **user's** runtime-provided (dynamically-linked) +/// `arro3.core.Table` and not the one statically linked from Rust. +#[derive(Debug)] +pub struct Arro3Table(PyTable); + +impl From for Arro3Table { + fn from(value: PyTable) -> Self { + Self(value) + } +} + +impl<'py> IntoPyObject<'py> for Arro3Table { + type Target = PyAny; + type Output = Bound<'py, PyAny>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let arro3_mod = py.import(intern!(py, "arro3.core"))?; + let (batches, schema) = self.0.into_inner(); + let capsule = PyTable::to_stream_pycapsule(py, batches, schema, None)?; + arro3_mod.getattr(intern!(py, "Table"))?.call_method1( + intern!(py, "from_arrow_pycapsule"), + PyTuple::new(py, vec![capsule])?, + ) + } +} diff --git a/pyo3-arrow/src/ffi/to_python/nanoarrow.rs b/pyo3-arrow/src/ffi/to_python/nanoarrow.rs index fbaf374..2ee59a3 100644 --- a/pyo3-arrow/src/ffi/to_python/nanoarrow.rs +++ b/pyo3-arrow/src/ffi/to_python/nanoarrow.rs @@ -1,25 +1,31 @@ +use pyo3::intern; use pyo3::prelude::*; use pyo3::types::{PyCapsule, PyTuple}; -use pyo3::{intern, IntoPyObjectExt}; -pub fn to_nanoarrow_schema(py: Python, capsule: &Bound<'_, PyCapsule>) -> PyResult { +pub fn to_nanoarrow_schema<'py>( + py: Python<'py>, + capsule: &Bound<'py, PyCapsule>, +) -> PyResult> { let na_mod = py.import(intern!(py, "nanoarrow"))?; - let pyarrow_obj = na_mod + na_mod .getattr(intern!(py, "Schema"))? - .call1(PyTuple::new(py, vec![capsule])?)?; - pyarrow_obj.into_py_any(py) + .call1(PyTuple::new(py, vec![capsule])?) } -pub fn to_nanoarrow_array(py: Python, capsules: &Bound<'_, PyTuple>) -> PyResult { +pub fn to_nanoarrow_array<'py>( + py: Python<'py>, + capsules: &Bound<'py, PyTuple>, +) -> PyResult> { let na_mod = py.import(intern!(py, "nanoarrow"))?; - let pyarrow_obj = na_mod.getattr(intern!(py, "Array"))?.call1(capsules)?; - pyarrow_obj.into_py_any(py) + na_mod.getattr(intern!(py, "Array"))?.call1(capsules) } -pub fn to_nanoarrow_array_stream(py: Python, capsule: &Bound<'_, PyCapsule>) -> PyResult { +pub fn to_nanoarrow_array_stream<'py>( + py: Python<'py>, + capsule: &Bound<'py, PyCapsule>, +) -> PyResult> { let na_mod = py.import(intern!(py, "nanoarrow"))?; - let pyarrow_obj = na_mod + na_mod .getattr(intern!(py, "ArrayStream"))? - .call1(PyTuple::new(py, vec![capsule])?)?; - pyarrow_obj.into_py_any(py) + .call1(PyTuple::new(py, vec![capsule])?) } diff --git a/pyo3-arrow/src/field.rs b/pyo3-arrow/src/field.rs index 3311ca0..f0e31ef 100644 --- a/pyo3-arrow/src/field.rs +++ b/pyo3-arrow/src/field.rs @@ -4,11 +4,12 @@ use std::sync::Arc; use arrow_schema::{Field, FieldRef}; use pyo3::exceptions::PyTypeError; +use pyo3::intern; use pyo3::prelude::*; use pyo3::types::{PyBytes, PyCapsule, PyDict, PyTuple, PyType}; -use pyo3::{intern, IntoPyObjectExt}; use crate::error::PyArrowResult; +use crate::export::{Arro3DataType, Arro3Field}; use crate::ffi::from_python::utils::import_schema_pycapsule; use crate::ffi::to_python::nanoarrow::to_nanoarrow_schema; use crate::ffi::to_python::to_schema_pycapsule; @@ -18,6 +19,7 @@ use crate::PyDataType; /// A Python-facing Arrow field. /// /// This is a wrapper around a [FieldRef]. +#[derive(Debug)] #[pyclass(module = "arro3.core._core", name = "Field", subclass)] pub struct PyField(FieldRef); @@ -41,29 +43,28 @@ impl PyField { } /// Export this to a Python `arro3.core.Field`. - pub fn to_arro3(&self, py: Python) -> PyResult { + pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult> { let arro3_mod = py.import(intern!(py, "arro3.core"))?; - let core_obj = arro3_mod.getattr(intern!(py, "Field"))?.call_method1( + arro3_mod.getattr(intern!(py, "Field"))?.call_method1( intern!(py, "from_arrow_pycapsule"), PyTuple::new(py, vec![self.__arrow_c_schema__(py)?])?, - )?; - core_obj.into_py_any(py) + ) } /// Export this to a Python `nanoarrow.Schema`. - pub fn to_nanoarrow(&self, py: Python) -> PyResult { + pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult> { to_nanoarrow_schema(py, &self.__arrow_c_schema__(py)?) } /// Export to a pyarrow.Field /// /// Requires pyarrow >=14 - pub fn to_pyarrow(self, py: Python) -> PyResult { + pub fn to_pyarrow<'py>(&'py self, py: Python<'py>) -> PyResult> { let pyarrow_mod = py.import(intern!(py, "pyarrow"))?; - let pyarrow_obj = pyarrow_mod + let cloned = PyField::new(self.0.clone()); + pyarrow_mod .getattr(intern!(py, "field"))? - .call1(PyTuple::new(py, vec![self.into_pyobject(py)?])?)?; - pyarrow_obj.into_py_any(py) + .call1(PyTuple::new(py, vec![cloned.into_pyobject(py)?])?) } } @@ -170,7 +171,7 @@ impl PyField { self.0.is_nullable() } - fn remove_metadata(&self, py: Python) -> PyResult { + fn remove_metadata(&self) -> Arro3Field { PyField::new( self.0 .as_ref() @@ -178,34 +179,34 @@ impl PyField { .with_metadata(Default::default()) .into(), ) - .to_arro3(py) + .into() } #[getter] - fn r#type(&self, py: Python) -> PyResult { - PyDataType::new(self.0.data_type().clone()).to_arro3(py) + fn r#type(&self) -> Arro3DataType { + PyDataType::new(self.0.data_type().clone()).into() } - fn with_metadata(&self, py: Python, metadata: MetadataInput) -> PyResult { - PyField::new( + fn with_metadata(&self, metadata: MetadataInput) -> PyResult { + Ok(PyField::new( self.0 .as_ref() .clone() .with_metadata(metadata.into_string_hashmap()?) .into(), ) - .to_arro3(py) + .into()) } - fn with_name(&self, py: Python, name: String) -> PyResult { - PyField::new(self.0.as_ref().clone().with_name(name).into()).to_arro3(py) + fn with_name(&self, name: String) -> Arro3Field { + PyField::new(self.0.as_ref().clone().with_name(name).into()).into() } - fn with_nullable(&self, py: Python, nullable: bool) -> PyResult { - PyField::new(self.0.as_ref().clone().with_nullable(nullable).into()).to_arro3(py) + fn with_nullable(&self, nullable: bool) -> Arro3Field { + PyField::new(self.0.as_ref().clone().with_nullable(nullable).into()).into() } - fn with_type(&self, py: Python, new_type: PyDataType) -> PyResult { + fn with_type(&self, new_type: PyDataType) -> Arro3Field { PyField::new( self.0 .as_ref() @@ -213,6 +214,6 @@ impl PyField { .with_data_type(new_type.into_inner()) .into(), ) - .to_arro3(py) + .into() } } diff --git a/pyo3-arrow/src/interop/numpy/to_numpy.rs b/pyo3-arrow/src/interop/numpy/to_numpy.rs index 1688d0a..6615286 100644 --- a/pyo3-arrow/src/interop/numpy/to_numpy.rs +++ b/pyo3-arrow/src/interop/numpy/to_numpy.rs @@ -4,10 +4,11 @@ use arrow_array::Array; use arrow_schema::DataType; use numpy::ToPyArray; use pyo3::exceptions::{PyNotImplementedError, PyValueError}; +use pyo3::prelude::*; use pyo3::types::{PyAnyMethods, PyBytes, PyDict, PyList, PyString, PyTuple}; -use pyo3::{intern, IntoPyObjectExt, PyObject, PyResult, Python}; +use pyo3::{intern, PyResult, Python}; -pub fn to_numpy(py: Python, arr: &dyn Array) -> PyResult { +pub fn to_numpy<'py>(py: Python<'py>, arr: &'py dyn Array) -> PyResult> { if arr.null_count() > 0 { return Err(PyValueError::new_err( "Cannot create numpy array from arrow array with nulls.", @@ -19,7 +20,7 @@ pub fn to_numpy(py: Python, arr: &dyn Array) -> PyResult { arr.as_primitive::<$arrow_type>() .values() .to_pyarray(py) - .into_py_any(py)? + .into_any() }; } @@ -37,7 +38,7 @@ pub fn to_numpy(py: Python, arr: &dyn Array) -> PyResult { DataType::Int64 => impl_primitive!(Int64Type), DataType::Boolean => { let bools = arr.as_boolean().values().iter().collect::>(); - bools.to_pyarray(py).into_py_any(py)? + bools.to_pyarray(py).into_any() } // For other data types we create Python objects and then create an object-typed numpy // array @@ -50,12 +51,11 @@ pub fn to_numpy(py: Python, arr: &dyn Array) -> PyResult { let numpy_mod = py.import(intern!(py, "numpy"))?; let kwargs = PyDict::new(py); kwargs.set_item("dtype", numpy_mod.getattr(intern!(py, "object_"))?)?; - let np_arr = numpy_mod.call_method( + numpy_mod.call_method( intern!(py, "array"), PyTuple::new(py, vec![py_list])?, Some(&kwargs), - )?; - np_arr.into() + )? } DataType::LargeBinary => { let mut py_bytes = Vec::with_capacity(arr.len()); @@ -66,12 +66,11 @@ pub fn to_numpy(py: Python, arr: &dyn Array) -> PyResult { let numpy_mod = py.import(intern!(py, "numpy"))?; let kwargs = PyDict::new(py); kwargs.set_item("dtype", numpy_mod.getattr(intern!(py, "object_"))?)?; - let np_arr = numpy_mod.call_method( + numpy_mod.call_method( intern!(py, "array"), PyTuple::new(py, vec![py_list])?, Some(&kwargs), - )?; - np_arr.into() + )? } DataType::Utf8 => { let mut py_bytes = Vec::with_capacity(arr.len()); @@ -82,12 +81,11 @@ pub fn to_numpy(py: Python, arr: &dyn Array) -> PyResult { let numpy_mod = py.import(intern!(py, "numpy"))?; let kwargs = PyDict::new(py); kwargs.set_item("dtype", numpy_mod.getattr(intern!(py, "object_"))?)?; - let np_arr = numpy_mod.call_method( + numpy_mod.call_method( intern!(py, "array"), PyTuple::new(py, vec![py_list])?, Some(&kwargs), - )?; - np_arr.into() + )? } DataType::LargeUtf8 => { let mut py_bytes = Vec::with_capacity(arr.len()); @@ -98,12 +96,11 @@ pub fn to_numpy(py: Python, arr: &dyn Array) -> PyResult { let numpy_mod = py.import(intern!(py, "numpy"))?; let kwargs = PyDict::new(py); kwargs.set_item("dtype", numpy_mod.getattr(intern!(py, "object_"))?)?; - let np_arr = numpy_mod.call_method( + numpy_mod.call_method( intern!(py, "array"), PyTuple::new(py, vec![py_list])?, Some(&kwargs), - )?; - np_arr.into() + )? } dt => { return Err(PyNotImplementedError::new_err(format!( @@ -114,14 +111,15 @@ pub fn to_numpy(py: Python, arr: &dyn Array) -> PyResult { Ok(result) } -pub fn chunked_to_numpy(py: Python, arrs: &[&dyn Array]) -> PyResult { +pub fn chunked_to_numpy<'py>( + py: Python<'py>, + arrs: Vec<&'py dyn Array>, +) -> PyResult> { let py_arrays = arrs .iter() .map(|arr| to_numpy(py, *arr)) .collect::>>()?; let numpy_mod = py.import(intern!(py, "numpy"))?; - numpy_mod - .call_method1(intern!(py, "concatenate"), (py_arrays,))? - .into_py_any(py) + numpy_mod.call_method1(intern!(py, "concatenate"), (py_arrays,)) } diff --git a/pyo3-arrow/src/lib.rs b/pyo3-arrow/src/lib.rs index a2c53b2..cb5e44f 100644 --- a/pyo3-arrow/src/lib.rs +++ b/pyo3-arrow/src/lib.rs @@ -8,6 +8,7 @@ pub mod buffer; mod chunked; mod datatypes; pub mod error; +pub mod export; pub mod ffi; mod field; pub mod input; diff --git a/pyo3-arrow/src/record_batch.rs b/pyo3-arrow/src/record_batch.rs index 7eea5a5..8e86d49 100644 --- a/pyo3-arrow/src/record_batch.rs +++ b/pyo3-arrow/src/record_batch.rs @@ -12,6 +12,7 @@ use pyo3::types::{PyCapsule, PyTuple, PyType}; use pyo3::{intern, IntoPyObjectExt}; use crate::error::PyArrowResult; +use crate::export::{Arro3Array, Arro3Field, Arro3RecordBatch, Arro3Schema}; use crate::ffi::from_python::utils::import_array_pycapsules; use crate::ffi::to_python::nanoarrow::to_nanoarrow_array; use crate::ffi::to_python::to_array_pycapsules; @@ -81,19 +82,16 @@ impl PyRecordBatch { } /// Export this to a Python `arro3.core.RecordBatch`. - pub fn to_arro3(&self, py: Python) -> PyResult { + pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult> { let arro3_mod = py.import(intern!(py, "arro3.core"))?; - let core_obj = arro3_mod - .getattr(intern!(py, "RecordBatch"))? - .call_method1( - intern!(py, "from_arrow_pycapsule"), - self.__arrow_c_array__(py, None)?, - )?; - core_obj.into_py_any(py) + arro3_mod.getattr(intern!(py, "RecordBatch"))?.call_method1( + intern!(py, "from_arrow_pycapsule"), + self.__arrow_c_array__(py, None)?, + ) } /// Export this to a Python `nanoarrow.Array`. - pub fn to_nanoarrow(&self, py: Python) -> PyResult { + pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult> { to_nanoarrow_array(py, &self.__arrow_c_array__(py, None)?) } @@ -107,6 +105,16 @@ impl PyRecordBatch { .call1(PyTuple::new(py, vec![self.into_pyobject(py)?])?)?; pyarrow_obj.into_py_any(py) } + + pub(crate) fn to_array_pycapsules<'py>( + py: Python<'py>, + record_batch: RecordBatch, + requested_schema: Option>, + ) -> PyArrowResult> { + let field = Field::new_struct("", record_batch.schema_ref().fields().clone(), false); + let array: ArrayRef = Arc::new(StructArray::from(record_batch.clone())); + to_array_pycapsules(py, field.into(), &array, requested_schema) + } } impl From for PyRecordBatch { @@ -172,9 +180,7 @@ impl PyRecordBatch { py: Python<'py>, requested_schema: Option>, ) -> PyArrowResult> { - let field = Field::new_struct("", self.0.schema_ref().fields().clone(), false); - let array: ArrayRef = Arc::new(StructArray::from(self.0.clone())); - to_array_pycapsules(py, field.into(), &array, requested_schema) + Self::to_array_pycapsules(py, self.0.clone(), requested_schema) } fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult> { @@ -185,8 +191,8 @@ impl PyRecordBatch { self.0 == other.0 } - fn __getitem__(&self, py: Python, key: FieldIndexInput) -> PyResult { - self.column(py, key) + fn __getitem__(&self, key: FieldIndexInput) -> PyResult { + self.column(key) } fn __repr__(&self) -> String { @@ -272,11 +278,10 @@ impl PyRecordBatch { fn add_column( &self, - py: Python, i: usize, field: NameOrField, column: PyArray, - ) -> PyArrowResult { + ) -> PyArrowResult { let mut fields = self.0.schema_ref().fields().to_vec(); fields.insert(i, field.into_field(column.field())); let schema = Schema::new_with_metadata(fields, self.0.schema_ref().metadata().clone()); @@ -285,15 +290,14 @@ impl PyRecordBatch { arrays.insert(i, column.array().clone()); let new_rb = RecordBatch::try_new(schema.into(), arrays)?; - Ok(PyRecordBatch::new(new_rb).to_arro3(py)?) + Ok(PyRecordBatch::new(new_rb).into()) } fn append_column( &self, - py: Python, field: NameOrField, column: PyArray, - ) -> PyArrowResult { + ) -> PyArrowResult { let mut fields = self.0.schema_ref().fields().to_vec(); fields.push(field.into_field(column.field())); let schema = Schema::new_with_metadata(fields, self.0.schema_ref().metadata().clone()); @@ -302,14 +306,14 @@ impl PyRecordBatch { arrays.push(column.array().clone()); let new_rb = RecordBatch::try_new(schema.into(), arrays)?; - Ok(PyRecordBatch::new(new_rb).to_arro3(py)?) + Ok(PyRecordBatch::new(new_rb).into()) } - fn column(&self, py: Python, i: FieldIndexInput) -> PyResult { + fn column(&self, i: FieldIndexInput) -> PyResult { let column_index = i.into_position(self.0.schema_ref())?; let field = self.0.schema().field(column_index).clone(); let array = self.0.column(column_index).clone(); - PyArray::new(array, field.into()).to_arro3(py) + Ok(PyArray::new(array, field.into()).into()) } #[getter] @@ -323,9 +327,9 @@ impl PyRecordBatch { } #[getter] - fn columns(&self, py: Python) -> PyResult> { + fn columns(&self) -> PyResult> { (0..self.num_columns()) - .map(|i| self.column(py, FieldIndexInput::Position(i))) + .map(|i| self.column(FieldIndexInput::Position(i))) .collect() } @@ -333,10 +337,10 @@ impl PyRecordBatch { self.0 == other.0 } - fn field(&self, py: Python, i: FieldIndexInput) -> PyResult { + fn field(&self, i: FieldIndexInput) -> PyResult { let schema_ref = self.0.schema_ref(); let field = schema_ref.field(i.into_position(schema_ref)?); - PyField::new(field.clone().into()).to_arro3(py) + Ok(PyField::new(field.clone().into()).into()) } #[getter] @@ -354,30 +358,28 @@ impl PyRecordBatch { self.0.num_rows() } - fn remove_column(&self, py: Python, i: usize) -> PyResult { + fn remove_column(&self, i: usize) -> Arro3RecordBatch { let mut rb = self.0.clone(); rb.remove_column(i); - PyRecordBatch::new(rb).to_arro3(py) + PyRecordBatch::new(rb).into() } #[getter] - fn schema(&self, py: Python) -> PyResult { - PySchema::new(self.0.schema()).to_arro3(py) + fn schema(&self) -> Arro3Schema { + self.0.schema().into() } - fn select(&self, py: Python, columns: SelectIndices) -> PyArrowResult { + fn select(&self, columns: SelectIndices) -> PyArrowResult { let positions = columns.into_positions(self.0.schema_ref().fields())?; - let new_rb = self.0.project(&positions)?; - Ok(PyRecordBatch::new(new_rb).to_arro3(py)?) + Ok(self.0.project(&positions)?.into()) } fn set_column( &self, - py: Python, i: usize, field: NameOrField, column: PyArray, - ) -> PyArrowResult { + ) -> PyArrowResult { let mut fields = self.0.schema_ref().fields().to_vec(); fields[i] = field.into_field(column.field()); let schema = Schema::new_with_metadata(fields, self.0.schema_ref().metadata().clone()); @@ -385,8 +387,7 @@ impl PyRecordBatch { let mut arrays = self.0.columns().to_vec(); arrays[i] = column.array().clone(); - let new_rb = RecordBatch::try_new(schema.into(), arrays)?; - Ok(PyRecordBatch::new(new_rb).to_arro3(py)?) + Ok(RecordBatch::try_new(schema.into(), arrays)?.into()) } #[getter] @@ -395,26 +396,26 @@ impl PyRecordBatch { } #[pyo3(signature = (offset=0, length=None))] - fn slice(&self, py: Python, offset: usize, length: Option) -> PyResult { + fn slice(&self, offset: usize, length: Option) -> Arro3RecordBatch { let length = length.unwrap_or_else(|| self.num_rows() - offset); - PyRecordBatch::new(self.0.slice(offset, length)).to_arro3(py) + self.0.slice(offset, length).into() } - fn take(&self, py: Python, indices: PyArray) -> PyArrowResult { + fn take(&self, indices: PyArray) -> PyArrowResult { let new_batch = take_record_batch(self.as_ref(), indices.as_ref())?; - Ok(PyRecordBatch::new(new_batch).to_arro3(py)?) + Ok(new_batch.into()) } - fn to_struct_array(&self, py: Python) -> PyArrowResult { + fn to_struct_array(&self) -> Arro3Array { let struct_array: StructArray = self.0.clone().into(); let field = Field::new_struct("", self.0.schema_ref().fields().clone(), false) .with_metadata(self.0.schema_ref().metadata.clone()); - Ok(PyArray::new(Arc::new(struct_array), field.into()).to_arro3(py)?) + PyArray::new(Arc::new(struct_array), field.into()).into() } - fn with_schema(&self, py: Python, schema: PySchema) -> PyArrowResult { + fn with_schema(&self, schema: PySchema) -> PyArrowResult { let new_schema = schema.into_inner(); let new_batch = RecordBatch::try_new(new_schema.clone(), self.0.columns().to_vec())?; - Ok(PyRecordBatch::new(new_batch).to_arro3(py)?) + Ok(new_batch.into()) } } diff --git a/pyo3-arrow/src/record_batch_reader.rs b/pyo3-arrow/src/record_batch_reader.rs index 281f0ba..7012d43 100644 --- a/pyo3-arrow/src/record_batch_reader.rs +++ b/pyo3-arrow/src/record_batch_reader.rs @@ -9,6 +9,7 @@ use pyo3::types::{PyCapsule, PyTuple, PyType}; use pyo3::{intern, IntoPyObjectExt}; use crate::error::PyArrowResult; +use crate::export::{Arro3RecordBatch, Arro3Schema, Arro3Table}; use crate::ffi::from_python::utils::import_stream_pycapsule; use crate::ffi::to_python::chunked::ArrayIterator; use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream; @@ -80,19 +81,18 @@ impl PyRecordBatchReader { } /// Export this to a Python `arro3.core.RecordBatchReader`. - pub fn to_arro3(&mut self, py: Python) -> PyResult { + pub fn to_arro3<'py>(&'py mut self, py: Python<'py>) -> PyResult> { let arro3_mod = py.import(intern!(py, "arro3.core"))?; - let core_obj = arro3_mod + arro3_mod .getattr(intern!(py, "RecordBatchReader"))? .call_method1( intern!(py, "from_arrow_pycapsule"), PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?, - )?; - core_obj.into_py_any(py) + ) } /// Export this to a Python `nanoarrow.ArrayStream`. - pub fn to_nanoarrow(&mut self, py: Python) -> PyResult { + pub fn to_nanoarrow<'py>(&'py mut self, py: Python<'py>) -> PyResult> { to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?) } @@ -108,6 +108,25 @@ impl PyRecordBatchReader { )?; pyarrow_obj.into_py_any(py) } + + pub(crate) fn to_stream_pycapsule<'py>( + py: Python<'py>, + reader: Box, + requested_schema: Option>, + ) -> PyArrowResult> { + let schema = reader.schema().clone(); + let array_reader = reader.into_iter().map(|maybe_batch| { + let arr: ArrayRef = Arc::new(StructArray::from(maybe_batch?)); + Ok(arr) + }); + let array_reader = Box::new(ArrayIterator::new( + array_reader, + Field::new_struct("", schema.fields().clone(), false) + .with_metadata(schema.metadata.clone()) + .into(), + )); + to_stream_pycapsule(py, array_reader, requested_schema) + } } impl From> for PyRecordBatchReader { @@ -147,29 +166,17 @@ impl PyRecordBatchReader { .unwrap() .take() .ok_or(PyIOError::new_err("Cannot read from closed stream"))?; - - let schema = reader.schema().clone(); - let array_reader = reader.into_iter().map(|maybe_batch| { - let arr: ArrayRef = Arc::new(StructArray::from(maybe_batch?)); - Ok(arr) - }); - let array_reader = Box::new(ArrayIterator::new( - array_reader, - Field::new_struct("", schema.fields().clone(), false) - .with_metadata(schema.metadata.clone()) - .into(), - )); - to_stream_pycapsule(py, array_reader, requested_schema) + Self::to_stream_pycapsule(py, reader, requested_schema) } // Return self // https://stackoverflow.com/a/52056290 - fn __iter__(&mut self, py: Python) -> PyResult { + fn __iter__<'py>(&'py mut self, py: Python<'py>) -> PyResult> { self.to_arro3(py) } - fn __next__(&mut self, py: Python) -> PyArrowResult { - self.read_next_batch(py) + fn __next__(&mut self) -> PyArrowResult { + self.read_next_batch() } fn __repr__(&self) -> String { @@ -210,7 +217,7 @@ impl PyRecordBatchReader { self.0.lock().unwrap().is_none() } - fn read_all(&mut self, py: Python) -> PyArrowResult { + fn read_all(&mut self) -> PyArrowResult { let stream = self .0 .lock() @@ -222,24 +229,24 @@ impl PyRecordBatchReader { for batch in stream { batches.push(batch?); } - Ok(PyTable::try_new(batches, schema)?.to_arro3(py)?) + Ok(PyTable::try_new(batches, schema)?.into()) } - fn read_next_batch(&mut self, py: Python) -> PyArrowResult { + fn read_next_batch(&mut self) -> PyArrowResult { let mut inner = self.0.lock().unwrap(); let stream = inner .as_mut() .ok_or(PyIOError::new_err("Cannot read from closed stream."))?; if let Some(next_batch) = stream.next() { - Ok(PyRecordBatch::new(next_batch?).to_arro3(py)?) + Ok(next_batch?.into()) } else { Err(PyStopIteration::new_err("").into()) } } #[getter] - fn schema(&self, py: Python) -> PyResult { - PySchema::new(self.schema_ref()?.clone()).to_arro3(py) + fn schema(&self) -> PyResult { + Ok(PySchema::new(self.schema_ref()?.clone()).into()) } } diff --git a/pyo3-arrow/src/scalar.rs b/pyo3-arrow/src/scalar.rs index 8d30677..79464ca 100644 --- a/pyo3-arrow/src/scalar.rs +++ b/pyo3-arrow/src/scalar.rs @@ -13,10 +13,12 @@ use pyo3::types::{PyCapsule, PyList, PyTuple, PyType}; use pyo3::{intern, IntoPyObjectExt}; use crate::error::PyArrowResult; +use crate::export::{Arro3DataType, Arro3Field, Arro3Scalar}; use crate::ffi::to_array_pycapsules; -use crate::{PyArray, PyDataType, PyField}; +use crate::{PyArray, PyField}; /// A Python-facing Arrow scalar +#[derive(Debug)] #[pyclass(module = "arro3.core._core", name = "Scalar", subclass)] pub struct PyScalar { array: ArrayRef, @@ -85,13 +87,12 @@ impl PyScalar { /// Export to an arro3.core.Scalar. /// /// This requires that you depend on arro3-core from your Python package. - pub fn to_arro3(&self, py: Python) -> PyResult { + pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult> { let arro3_mod = py.import(intern!(py, "arro3.core"))?; - let core_obj = arro3_mod.getattr(intern!(py, "Scalar"))?.call_method1( + arro3_mod.getattr(intern!(py, "Scalar"))?.call_method1( intern!(py, "from_arrow_pycapsule"), self.__arrow_c_array__(py, None)?, - )?; - core_obj.into_py_any(py) + ) } } @@ -131,7 +132,7 @@ impl PyScalar { &'py self, py: Python<'py>, requested_schema: Option>, - ) -> PyArrowResult> { + ) -> PyArrowResult> { to_array_pycapsules(py, self.field.clone(), &self.array, requested_schema) } @@ -390,18 +391,16 @@ impl PyScalar { Ok(result) } - fn cast(&self, py: Python, target_type: PyField) -> PyArrowResult { + fn cast(&self, target_type: PyField) -> PyArrowResult { let new_field = target_type.into_inner(); let new_array = arrow::compute::cast(&self.array, new_field.data_type())?; - Ok(PyScalar::try_new(new_array, new_field) - .unwrap() - .to_arro3(py)?) + Ok(PyScalar::try_new(new_array, new_field).unwrap().into()) } #[getter] #[pyo3(name = "field")] - fn py_field(&self, py: Python) -> PyResult { - PyField::new(self.field.clone()).to_arro3(py) + fn py_field(&self) -> Arro3Field { + self.field.clone().into() } #[getter] @@ -410,8 +409,8 @@ impl PyScalar { } #[getter] - fn r#type(&self, py: Python) -> PyResult { - PyDataType::new(self.field.data_type().clone()).to_arro3(py) + fn r#type(&self) -> Arro3DataType { + self.field.data_type().clone().into() } } diff --git a/pyo3-arrow/src/schema.rs b/pyo3-arrow/src/schema.rs index 4ebf3cc..96331a6 100644 --- a/pyo3-arrow/src/schema.rs +++ b/pyo3-arrow/src/schema.rs @@ -9,6 +9,7 @@ use pyo3::types::{PyBytes, PyCapsule, PyDict, PyTuple, PyType}; use pyo3::{intern, IntoPyObjectExt}; use crate::error::PyArrowResult; +use crate::export::{Arro3DataType, Arro3Field, Arro3Schema, Arro3Table}; use crate::ffi::from_python::utils::import_schema_pycapsule; use crate::ffi::to_python::nanoarrow::to_nanoarrow_schema; use crate::ffi::to_python::to_schema_pycapsule; @@ -18,6 +19,7 @@ use crate::{PyDataType, PyField, PyTable}; /// A Python-facing Arrow schema. /// /// This is a wrapper around a [SchemaRef]. +#[derive(Debug)] #[pyclass(module = "arro3.core._core", name = "Schema", subclass)] pub struct PySchema(SchemaRef); @@ -41,17 +43,16 @@ impl PySchema { } /// Export this to a Python `arro3.core.Schema`. - pub fn to_arro3(&self, py: Python) -> PyResult { + pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult> { let arro3_mod = py.import(intern!(py, "arro3.core"))?; - let core_obj = arro3_mod.getattr(intern!(py, "Schema"))?.call_method1( + arro3_mod.getattr(intern!(py, "Schema"))?.call_method1( intern!(py, "from_arrow_pycapsule"), PyTuple::new(py, vec![self.__arrow_c_schema__(py)?])?, - )?; - core_obj.into_py_any(py) + ) } /// Export this to a Python `nanoarrow.Schema`. - pub fn to_nanoarrow(&self, py: Python) -> PyResult { + pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult> { to_nanoarrow_schema(py, &self.__arrow_c_schema__(py)?) } @@ -134,8 +135,8 @@ impl PySchema { self.0 == other.0 } - fn __getitem__(&self, py: Python, key: FieldIndexInput) -> PyArrowResult { - self.field(py, key) + fn __getitem__(&self, key: FieldIndexInput) -> PyArrowResult { + self.field(key) } fn __len__(&self) -> usize { @@ -157,25 +158,23 @@ impl PySchema { Self::from_arrow_pycapsule(capsule) } - fn append(&self, py: Python, field: PyField) -> PyResult { + fn append(&self, field: PyField) -> Arro3Schema { let mut fields = self.0.fields().to_vec(); fields.push(field.into_inner()); - let schema = Schema::new_with_metadata(fields, self.0.metadata().clone()); - PySchema::new(schema.into()).to_arro3(py) + Schema::new_with_metadata(fields, self.0.metadata().clone()).into() } - fn empty_table(&self, py: Python) -> PyResult { - PyTable::try_new(vec![], self.into())?.to_arro3(py) + fn empty_table(&self) -> PyResult { + Ok(PyTable::try_new(vec![], self.into())?.into()) } fn equals(&self, other: PySchema) -> bool { self.0 == other.0 } - fn field(&self, py: Python, i: FieldIndexInput) -> PyArrowResult { + fn field(&self, i: FieldIndexInput) -> PyArrowResult { let index = i.into_position(&self.0)?; - let field = self.0.field(index); - Ok(PyField::new(field.clone().into()).to_arro3(py)?) + Ok(self.0.field(index).into()) } fn get_all_field_indices(&self, name: String) -> Vec { @@ -207,11 +206,10 @@ impl PySchema { } } - fn insert(&self, py: Python, i: usize, field: PyField) -> PyResult { + fn insert(&self, i: usize, field: PyField) -> Arro3Schema { let mut fields = self.0.fields().to_vec(); fields.insert(i, field.into_inner()); - let schema = Schema::new_with_metadata(fields, self.0.metadata().clone()); - PySchema::new(schema.into()).to_arro3(py) + Schema::new_with_metadata(fields, self.0.metadata().clone()).into() } // Note: we can't return HashMap, Vec> because that will coerce keys and values to @@ -238,47 +236,41 @@ impl PySchema { self.0.fields().iter().map(|f| f.name().clone()).collect() } - fn remove(&self, py: Python, i: usize) -> PyResult { + fn remove(&self, i: usize) -> Arro3Schema { let mut fields = self.0.fields().to_vec(); fields.remove(i); - let schema = Schema::new_with_metadata(fields, self.0.metadata().clone()); - PySchema::new(schema.into()).to_arro3(py) + Schema::new_with_metadata(fields, self.0.metadata().clone()).into() } - fn remove_metadata(&self, py: Python) -> PyResult { - PySchema::new( - self.0 - .as_ref() - .clone() - .with_metadata(Default::default()) - .into(), - ) - .to_arro3(py) + fn remove_metadata(&self) -> Arro3Schema { + self.0 + .as_ref() + .clone() + .with_metadata(Default::default()) + .into() } - fn set(&self, py: Python, i: usize, field: PyField) -> PyResult { + fn set(&self, i: usize, field: PyField) -> Arro3Schema { let mut fields = self.0.fields().to_vec(); fields[i] = field.into_inner(); - let schema = Schema::new_with_metadata(fields, self.0.metadata().clone()); - PySchema::new(schema.into()).to_arro3(py) + Schema::new_with_metadata(fields, self.0.metadata().clone()).into() } #[getter] - fn types(&self, py: Python) -> PyArrowResult> { - Ok(self - .0 + fn types(&self) -> Vec { + self.0 .fields() .iter() - .map(|f| PyDataType::new(f.data_type().clone()).to_arro3(py)) - .collect::>()?) + .map(|f| PyDataType::new(f.data_type().clone()).into()) + .collect() } - fn with_metadata(&self, py: Python, metadata: MetadataInput) -> PyResult { + fn with_metadata(&self, metadata: MetadataInput) -> PyResult { let schema = self .0 .as_ref() .clone() .with_metadata(metadata.into_string_hashmap()?); - PySchema::new(schema.into()).to_arro3(py) + Ok(schema.into()) } } diff --git a/pyo3-arrow/src/table.rs b/pyo3-arrow/src/table.rs index 855b3a2..eeb103f 100644 --- a/pyo3-arrow/src/table.rs +++ b/pyo3-arrow/src/table.rs @@ -13,6 +13,10 @@ use pyo3::types::{PyCapsule, PyTuple, PyType}; use pyo3::{intern, IntoPyObjectExt}; use crate::error::{PyArrowError, PyArrowResult}; +use crate::export::{ + Arro3ChunkedArray, Arro3Field, Arro3RecordBatch, Arro3RecordBatchReader, Arro3Schema, + Arro3Table, +}; use crate::ffi::from_python::utils::import_stream_pycapsule; use crate::ffi::to_python::chunked::ArrayIterator; use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream; @@ -75,17 +79,16 @@ impl PyTable { } /// Export this to a Python `arro3.core.Table`. - pub fn to_arro3(&self, py: Python) -> PyResult { + pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult> { let arro3_mod = py.import(intern!(py, "arro3.core"))?; - let core_obj = arro3_mod.getattr(intern!(py, "Table"))?.call_method1( + arro3_mod.getattr(intern!(py, "Table"))?.call_method1( intern!(py, "from_arrow_pycapsule"), PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?, - )?; - core_obj.into_py_any(py) + ) } /// Export this to a Python `nanoarrow.ArrayStream`. - pub fn to_nanoarrow(&self, py: Python) -> PyResult { + pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult> { to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?) } @@ -100,6 +103,26 @@ impl PyTable { pyarrow_obj.into_py_any(py) } + pub(crate) fn to_stream_pycapsule<'py>( + py: Python<'py>, + batches: Vec, + schema: SchemaRef, + requested_schema: Option>, + ) -> PyArrowResult> { + let field = schema.fields(); + let array_reader = batches.into_iter().map(|batch| { + let arr: ArrayRef = Arc::new(StructArray::from(batch)); + Ok(arr) + }); + let array_reader = Box::new(ArrayIterator::new( + array_reader, + Field::new_struct("", field.clone(), false) + .with_metadata(schema.metadata.clone()) + .into(), + )); + to_stream_pycapsule(py, array_reader, requested_schema) + } + pub(crate) fn rechunk(&self, chunk_lengths: Vec) -> PyArrowResult { let total_chunk_length = chunk_lengths.iter().sum::(); if total_chunk_length != self.num_rows() { @@ -213,26 +236,20 @@ impl PyTable { py: Python<'py>, requested_schema: Option>, ) -> PyArrowResult> { - let field = self.schema.fields().clone(); - let array_reader = self.batches.clone().into_iter().map(|batch| { - let arr: ArrayRef = Arc::new(StructArray::from(batch)); - Ok(arr) - }); - let array_reader = Box::new(ArrayIterator::new( - array_reader, - Field::new_struct("", field, false) - .with_metadata(self.schema.metadata.clone()) - .into(), - )); - to_stream_pycapsule(py, array_reader, requested_schema) + Self::to_stream_pycapsule( + py, + self.batches.clone(), + self.schema.clone(), + requested_schema, + ) } fn __eq__(&self, other: &PyTable) -> bool { self.batches == other.batches && self.schema == other.schema } - fn __getitem__(&self, py: Python, key: FieldIndexInput) -> PyArrowResult { - self.column(py, key) + fn __getitem__(&self, key: FieldIndexInput) -> PyArrowResult { + self.column(key) } fn __len__(&self) -> usize { @@ -361,11 +378,10 @@ impl PyTable { fn add_column( &self, - py: Python, i: usize, field: NameOrField, column: PyChunkedArray, - ) -> PyArrowResult { + ) -> PyArrowResult { if self.num_rows() != column.len() { return Err( PyValueError::new_err("Number of rows in column does not match table.").into(), @@ -398,15 +414,14 @@ impl PyTable { }) .collect::, PyArrowError>>()?; - Ok(PyTable::try_new(new_batches, new_schema)?.to_arro3(py)?) + Ok(PyTable::try_new(new_batches, new_schema)?.into()) } fn append_column( &self, - py: Python, field: NameOrField, column: PyChunkedArray, - ) -> PyArrowResult { + ) -> PyArrowResult { if self.num_rows() != column.len() { return Err( PyValueError::new_err("Number of rows in column does not match table.").into(), @@ -439,7 +454,7 @@ impl PyTable { }) .collect::, PyArrowError>>()?; - Ok(PyTable::try_new(new_batches, new_schema)?.to_arro3(py)?) + Ok(PyTable::try_new(new_batches, new_schema)?.into()) } #[getter] @@ -447,7 +462,7 @@ impl PyTable { self.batches.iter().map(|batch| batch.num_rows()).collect() } - fn column(&self, py: Python, i: FieldIndexInput) -> PyArrowResult { + fn column(&self, i: FieldIndexInput) -> PyArrowResult { let column_index = i.into_position(&self.schema)?; let field = self.schema.field(column_index).clone(); let chunks = self @@ -455,7 +470,7 @@ impl PyTable { .iter() .map(|batch| batch.column(column_index).clone()) .collect(); - Ok(PyChunkedArray::try_new(chunks, field.into())?.to_arro3(py)?) + Ok(PyChunkedArray::try_new(chunks, field.into())?.into()) } #[getter] @@ -468,20 +483,20 @@ impl PyTable { } #[getter] - fn columns(&self, py: Python) -> PyArrowResult> { + fn columns(&self) -> PyArrowResult> { (0..self.num_columns()) - .map(|i| self.column(py, FieldIndexInput::Position(i))) + .map(|i| self.column(FieldIndexInput::Position(i))) .collect() } - fn combine_chunks(&self, py: Python) -> PyArrowResult { + fn combine_chunks(&self) -> PyArrowResult { let batch = concat_batches(&self.schema, &self.batches)?; - Ok(PyTable::try_new(vec![batch], self.schema.clone())?.to_arro3(py)?) + Ok(PyTable::try_new(vec![batch], self.schema.clone())?.into()) } - fn field(&self, py: Python, i: FieldIndexInput) -> PyArrowResult { + fn field(&self, i: FieldIndexInput) -> PyArrowResult { let field = self.schema.field(i.into_position(&self.schema)?); - Ok(PyField::new(field.clone().into()).to_arro3(py)?) + Ok(PyField::new(field.clone().into()).into()) } #[getter] @@ -505,7 +520,7 @@ impl PyTable { #[pyo3(signature = (*, max_chunksize=None))] #[pyo3(name = "rechunk")] - fn rechunk_py(&self, py: Python, max_chunksize: Option) -> PyArrowResult { + fn rechunk_py(&self, max_chunksize: Option) -> PyArrowResult { let max_chunksize = max_chunksize.unwrap_or(self.num_rows()); if max_chunksize == 0 { return Err(PyValueError::new_err("max_chunksize must be > 0").into()); @@ -518,10 +533,10 @@ impl PyTable { offset += chunk_length; chunk_lengths.push(chunk_length); } - Ok(self.rechunk(chunk_lengths)?.to_arro3(py)?) + Ok(self.rechunk(chunk_lengths)?.into()) } - fn remove_column(&self, py: Python, i: usize) -> PyArrowResult { + fn remove_column(&self, i: usize) -> PyArrowResult { let mut fields = self.schema.fields().to_vec(); fields.remove(i); let new_schema = Arc::new(Schema::new_with_metadata( @@ -539,10 +554,10 @@ impl PyTable { }) .collect::, PyArrowError>>()?; - Ok(PyTable::try_new(new_batches, new_schema)?.to_arro3(py)?) + Ok(PyTable::try_new(new_batches, new_schema)?.into()) } - fn rename_columns(&self, py: Python, names: Vec) -> PyArrowResult { + fn rename_columns(&self, names: Vec) -> PyArrowResult { if names.len() != self.num_columns() { return Err(PyValueError::new_err("When names is a list[str], must pass the same number of names as there are columns.").into()); } @@ -558,15 +573,15 @@ impl PyTable { new_fields, self.schema.metadata().clone(), )); - Ok(PyTable::try_new(self.batches.clone(), new_schema)?.to_arro3(py)?) + Ok(PyTable::try_new(self.batches.clone(), new_schema)?.into()) } #[getter] - fn schema(&self, py: Python) -> PyResult { - PySchema::new(self.schema.clone()).to_arro3(py) + fn schema(&self) -> Arro3Schema { + PySchema::new(self.schema.clone()).into() } - fn select(&self, py: Python, columns: SelectIndices) -> PyArrowResult { + fn select(&self, columns: SelectIndices) -> PyArrowResult { let positions = columns.into_positions(self.schema.fields())?; let new_schema = Arc::new(self.schema.project(&positions)?); @@ -575,16 +590,15 @@ impl PyTable { .iter() .map(|batch| batch.project(&positions)) .collect::, ArrowError>>()?; - Ok(PyTable::try_new(new_batches, new_schema)?.to_arro3(py)?) + Ok(PyTable::try_new(new_batches, new_schema)?.into()) } fn set_column( &self, - py: Python, i: usize, field: NameOrField, column: PyChunkedArray, - ) -> PyArrowResult { + ) -> PyArrowResult { if self.num_rows() != column.len() { return Err( PyValueError::new_err("Number of rows in column does not match table.").into(), @@ -617,7 +631,7 @@ impl PyTable { }) .collect::, PyArrowError>>()?; - Ok(PyTable::try_new(new_batches, new_schema)?.to_arro3(py)?) + Ok(PyTable::try_new(new_batches, new_schema)?.into()) } #[getter] @@ -627,33 +641,27 @@ impl PyTable { #[pyo3(signature = (offset=0, length=None))] #[pyo3(name = "slice")] - fn slice_py( - &self, - py: Python, - offset: usize, - length: Option, - ) -> PyArrowResult { + fn slice_py(&self, offset: usize, length: Option) -> PyArrowResult { let length = length.unwrap_or_else(|| self.num_rows() - offset); - let sliced_chunked_array = self.slice(offset, length)?; - Ok(sliced_chunked_array.to_arro3(py)?) + Ok(self.slice(offset, length)?.into()) } - fn to_batches(&self, py: Python) -> PyResult> { + fn to_batches(&self) -> Vec { self.batches .iter() - .map(|batch| PyRecordBatch::new(batch.clone()).to_arro3(py)) + .map(|batch| PyRecordBatch::new(batch.clone()).into()) .collect() } - fn to_reader(&self, py: Python) -> PyResult { + fn to_reader(&self) -> Arro3RecordBatchReader { let reader = Box::new(RecordBatchIterator::new( self.batches.clone().into_iter().map(Ok), self.schema.clone(), )); - PyRecordBatchReader::new(reader).to_arro3(py) + PyRecordBatchReader::new(reader).into() } - fn to_struct_array(&self, py: Python) -> PyArrowResult { + fn to_struct_array(&self) -> PyArrowResult { let chunks = self .batches .iter() @@ -664,16 +672,16 @@ impl PyTable { .collect::>(); let field = Field::new_struct("", self.schema.fields().clone(), false) .with_metadata(self.schema.metadata.clone()); - Ok(PyChunkedArray::try_new(chunks, field.into())?.to_arro3(py)?) + Ok(PyChunkedArray::try_new(chunks, field.into())?.into()) } - fn with_schema(&self, py: Python, schema: PySchema) -> PyArrowResult { + fn with_schema(&self, schema: PySchema) -> PyArrowResult { let new_schema = schema.into_inner(); let new_batches = self .batches .iter() .map(|batch| RecordBatch::try_new(new_schema.clone(), batch.columns().to_vec())) .collect::, ArrowError>>()?; - Ok(PyTable::try_new(new_batches, new_schema)?.to_arro3(py)?) + Ok(PyTable::try_new(new_batches, new_schema)?.into()) } }