From 78cd23b3cc30713225c3d35be2c8a4c075350baf Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 27 Sep 2023 20:41:50 -0400 Subject: [PATCH] cargo fmt --- .../src/datasource/file_format/parquet.rs | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 7d7386a2af52..155c5304d2c7 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -19,7 +19,9 @@ use arrow_array::RecordBatch; -use parquet::arrow::arrow_writer::{ArrowColumnWriter, get_column_writers, ArrowLeafColumn, compute_leaves}; +use parquet::arrow::arrow_writer::{ + compute_leaves, get_column_writers, ArrowColumnWriter, ArrowLeafColumn, +}; use parquet::column::writer::ColumnCloseResult; use parquet::file::writer::SerializedFileWriter; use rand::distributions::DistString; @@ -885,22 +887,19 @@ fn spawn_column_parallel_row_group_writer( parquet_props: Arc, max_buffer_size: usize, ) -> Result<(Vec, Vec)> { - let schema_desc = arrow_to_parquet_schema(&schema)?; - let col_writers = - get_column_writers(&schema_desc, &parquet_props, &schema)?; + let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?; let num_columns = col_writers.len(); let mut col_writer_handles = Vec::with_capacity(num_columns); let mut col_array_channels = Vec::with_capacity(num_columns); for writer in col_writers.into_iter() { // Buffer size of this channel limits the number of arrays queued up for column level serialization - let (send_array, recieve_array) = mpsc::channel::(max_buffer_size); + let (send_array, recieve_array) = + mpsc::channel::(max_buffer_size); col_array_channels.push(send_array); - col_writer_handles.push(tokio::spawn(column_serializer_task( - recieve_array, - writer, - ))) + col_writer_handles + .push(tokio::spawn(column_serializer_task(recieve_array, writer))) } Ok((col_writer_handles, col_array_channels)) @@ -929,8 +928,9 @@ async fn send_arrays_to_col_writers( .iter() .zip(rb.columns()) .zip(schema.fields()) - .map(|((a,b),c)| (a,b,c)) { - for c in compute_leaves(field, array)?{ + .map(|((a, b), c)| (a, b, c)) + { + for c in compute_leaves(field, array)? { tx.send(c).await.map_err(|_| { DataFusionError::Internal("Unable to send array to writer!".into()) })?; @@ -1031,12 +1031,14 @@ fn spawn_parquet_parallel_serialization_task( for mut rx in rb_recievers { while let Some(rb) = rx.recv().await { if current_rg_rows + rb.num_rows() < max_row_group_rows { - send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone()).await?; + send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone()) + .await?; current_rg_rows += rb.num_rows(); } else { let rows_left = max_row_group_rows - current_rg_rows; let a = rb.slice(0, rows_left); - send_arrays_to_col_writers(&col_array_channels, &a, schema.clone()).await?; + send_arrays_to_col_writers(&col_array_channels, &a, schema.clone()) + .await?; // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup // on a separate task, so that we can immediately start on the next RG before waiting @@ -1060,7 +1062,8 @@ fn spawn_parquet_parallel_serialization_task( writer_props.clone(), max_buffer_rb, )?; - send_arrays_to_col_writers(&col_array_channels, &b, schema.clone()).await?; + send_arrays_to_col_writers(&col_array_channels, &b, schema.clone()) + .await?; current_rg_rows = b.num_rows(); } } @@ -1068,10 +1071,8 @@ fn spawn_parquet_parallel_serialization_task( // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows drop(col_array_channels); - let finalize_rg_task = spawn_rg_join_and_finalize_task( - column_writer_handles, - current_rg_rows, - ); + let finalize_rg_task = + spawn_rg_join_and_finalize_task(column_writer_handles, current_rg_rows); serialize_tx.send(finalize_rg_task).await.map_err(|_| { DataFusionError::Internal("Unable to send closed RG to concat task!".into())