Skip to content

Commit

Permalink
Add unwraps and debug prints to dataset_exec.rs
Browse files Browse the repository at this point in the history
This is a temporary commit for the purpose of tracking down the bug.
  • Loading branch information
Michael-J-Ward committed Aug 27, 2024
1 parent d91b738 commit ae7470e
Showing 1 changed file with 45 additions and 19 deletions.
64 changes: 45 additions & 19 deletions src/dataset_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,25 @@ impl Iterator for PyArrowBatchesAdapter {

fn next(&mut self) -> Option<Self::Item> {
Python::with_gil(|py| {
println!("getting next pyarrow batch");
let mut batches = self.batches.clone().into_bound(py);
Some(
batches
.next()?
.and_then(|batch| Ok(batch.extract::<PyArrowType<_>>()?.0))
.map_err(|err| ArrowError::ExternalError(Box::new(err))),
)

let next_batch = batches.next().expect("no next batch").expect("failed to get next batch");

// NOTE: This is where the failure actually occurs.
// It occurs because `from_pyarrow_bound` uses the default `RecordBatchOptions` which does *not* allow a batch with no columns.
// See https://github.com/apache/arrow-rs/pull/1552 for more details.
let extracted = next_batch.extract::<PyArrowType<_>>().expect("failed to extract batch");
Some(Ok(extracted.0))

// Some(Ok(
// batches
// .next()
// .unwrap()
// .and_then(|batch| Ok(batch.extract::<PyArrowType<_>>().unwrap().0))
// .unwrap()
// // .map_err(|err| ArrowError::ExternalError(Box::new(err))),
// ))
})
}
}
Expand All @@ -83,6 +95,7 @@ impl DatasetExec {
projection: Option<Vec<usize>>,
filters: &[Expr],
) -> Result<Self, DataFusionError> {
println!("initiating new DatasetExec");
let columns: Option<Result<Vec<String>, DataFusionError>> = projection.map(|p| {
p.iter()
.map(|index| {
Expand Down Expand Up @@ -138,7 +151,7 @@ impl DatasetExec {
Partitioning::UnknownPartitioning(fragments.len()),
ExecutionMode::Bounded,
);

println!("intiating new DatasetExec: done");
Ok(DatasetExec {
dataset: dataset.clone().unbind(),
schema,
Expand Down Expand Up @@ -184,45 +197,58 @@ impl ExecutionPlan for DatasetExec {
partition: usize,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
println!("executing DatasetExec");
let batch_size = context.session_config().batch_size();
Python::with_gil(|py| {
let dataset = self.dataset.bind(py);
let fragments = self.fragments.bind(py);
let fragment = fragments
.get_item(partition)
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
.unwrap();
// .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;

// We need to pass the dataset schema to unify the fragment and dataset schema per PyArrow docs
let dataset_schema = dataset
.getattr("schema")
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
.unwrap();
// .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
println!("dataset_schema: {:?}", dataset_schema);
let kwargs = PyDict::new_bound(py);
kwargs
.set_item("columns", self.columns.clone())
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
.set_item("columns", self.columns.clone()).unwrap();
// .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
kwargs
.set_item(
"filter",
self.filter_expr.as_ref().map(|expr| expr.clone_ref(py)),
)
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
).unwrap();
// .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
kwargs
.set_item("batch_size", batch_size)
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
.unwrap();
// .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
let scanner = fragment
.call_method("scanner", (dataset_schema,), Some(&kwargs))
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
.unwrap();
// .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
let schema: SchemaRef = Arc::new(
scanner
.getattr("projected_schema")
.and_then(|schema| Ok(schema.extract::<PyArrowType<_>>()?.0))
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?,
.and_then(|schema| {
let pyarrow_schema = schema.extract::<PyArrowType<_>>().unwrap().0;
println!("pyarrow_schema: {:?}", pyarrow_schema);
Ok(pyarrow_schema)
})
.unwrap(),
// .map_err(|err| InnerDataFusionError::External(Box::new(err)))?,
);
let record_batches: Bound<'_, PyIterator> = scanner
.call_method0("to_batches")
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?
.unwrap()
// .map_err(|err| InnerDataFusionError::External(Box::new(err)))?
.iter()
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
.unwrap();
// .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;

let record_batches = PyArrowBatchesAdapter {
batches: record_batches.into(),
Expand Down

0 comments on commit ae7470e

Please sign in to comment.