diff --git a/src/dataset_exec.rs b/src/dataset_exec.rs index 5fe1f4d1..ea8cf8dd 100644 --- a/src/dataset_exec.rs +++ b/src/dataset_exec.rs @@ -53,13 +53,25 @@ impl Iterator for PyArrowBatchesAdapter { fn next(&mut self) -> Option { Python::with_gil(|py| { + println!("getting next pyarrow batch"); let mut batches = self.batches.clone().into_bound(py); - Some( - batches - .next()? - .and_then(|batch| Ok(batch.extract::>()?.0)) - .map_err(|err| ArrowError::ExternalError(Box::new(err))), - ) + + let next_batch = batches.next().expect("no next batch").expect("failed to get next batch"); + + // NOTE: This is where the failure actually occurs. + // It occurs because `from_pyarrow_bound` uses the default `RecordBatchOptions` which does *not* allow a batch with no columns. + // See https://github.com/apache/arrow-rs/pull/1552 for more details. + let extracted = next_batch.extract::>().expect("failed to extract batch"); + Some(Ok(extracted.0)) + + // Some(Ok( + // batches + // .next() + // .unwrap() + // .and_then(|batch| Ok(batch.extract::>().unwrap().0)) + // .unwrap() + // // .map_err(|err| ArrowError::ExternalError(Box::new(err))), + // )) }) } } @@ -83,6 +95,7 @@ impl DatasetExec { projection: Option>, filters: &[Expr], ) -> Result { + println!("initiating new DatasetExec"); let columns: Option, DataFusionError>> = projection.map(|p| { p.iter() .map(|index| { @@ -138,7 +151,7 @@ impl DatasetExec { Partitioning::UnknownPartitioning(fragments.len()), ExecutionMode::Bounded, ); - + println!("intiating new DatasetExec: done"); Ok(DatasetExec { dataset: dataset.clone().unbind(), schema, @@ -184,45 +197,58 @@ impl ExecutionPlan for DatasetExec { partition: usize, context: Arc, ) -> DFResult { + println!("executing DatasetExec"); let batch_size = context.session_config().batch_size(); Python::with_gil(|py| { let dataset = self.dataset.bind(py); let fragments = self.fragments.bind(py); let fragment = fragments .get_item(partition) - .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; + .unwrap(); + // .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; // We need to pass the dataset schema to unify the fragment and dataset schema per PyArrow docs let dataset_schema = dataset .getattr("schema") - .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; + .unwrap(); + // .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; + println!("dataset_schema: {:?}", dataset_schema); let kwargs = PyDict::new_bound(py); kwargs - .set_item("columns", self.columns.clone()) - .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; + .set_item("columns", self.columns.clone()).unwrap(); + // .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; kwargs .set_item( "filter", self.filter_expr.as_ref().map(|expr| expr.clone_ref(py)), - ) - .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; + ).unwrap(); + // .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; kwargs .set_item("batch_size", batch_size) - .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; + .unwrap(); + // .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; let scanner = fragment .call_method("scanner", (dataset_schema,), Some(&kwargs)) - .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; + .unwrap(); + // .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; let schema: SchemaRef = Arc::new( scanner .getattr("projected_schema") - .and_then(|schema| Ok(schema.extract::>()?.0)) - .map_err(|err| InnerDataFusionError::External(Box::new(err)))?, + .and_then(|schema| { + let pyarrow_schema = schema.extract::>().unwrap().0; + println!("pyarrow_schema: {:?}", pyarrow_schema); + Ok(pyarrow_schema) + }) + .unwrap(), + // .map_err(|err| InnerDataFusionError::External(Box::new(err)))?, ); let record_batches: Bound<'_, PyIterator> = scanner .call_method0("to_batches") - .map_err(|err| InnerDataFusionError::External(Box::new(err)))? + .unwrap() + // .map_err(|err| InnerDataFusionError::External(Box::new(err)))? .iter() - .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; + .unwrap(); + // .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; let record_batches = PyArrowBatchesAdapter { batches: record_batches.into(),