From 8317fbcfc2490230cd0f7eb0d83d52c33b7c3c29 Mon Sep 17 00:00:00 2001 From: Will Leach <4619280+melbourne2991@users.noreply.github.com> Date: Fri, 16 Feb 2024 14:24:03 +1100 Subject: [PATCH] lift partitioning above file format --- Cargo.lock | 1 - crates/datasources/Cargo.toml | 1 - .../src/common/sink/hive_partitioning.rs | 194 ++++++++++ crates/datasources/src/common/sink/mod.rs | 1 + crates/datasources/src/common/sink/parquet.rs | 166 ++------- .../src/common/sink/write/demux.rs | 2 +- .../datasources/src/common/sink/write/mod.rs | 178 +--------- .../src/common/sink/write/parquet.rs | 330 ------------------ .../src/planner/physical_plan/copy_to.rs | 41 +-- 9 files changed, 243 insertions(+), 671 deletions(-) create mode 100644 crates/datasources/src/common/sink/hive_partitioning.rs delete mode 100644 crates/datasources/src/common/sink/write/parquet.rs diff --git a/Cargo.lock b/Cargo.lock index 53886b720..0ef288b55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2469,7 +2469,6 @@ name = "datasources" version = "0.8.4" dependencies = [ "apache-avro", - "arrow-array", "arrow-schema", "async-channel", "async-stream", diff --git a/crates/datasources/Cargo.toml b/crates/datasources/Cargo.toml index 8ae258a69..671e9e4d1 100644 --- a/crates/datasources/Cargo.toml +++ b/crates/datasources/Cargo.toml @@ -13,7 +13,6 @@ apache-avro = "0.16" async-channel = "2.1.1" async-stream = "0.3.5" async-trait = { workspace = true } -arrow-array = { version = "50.0.0", default-features = false } arrow-schema = { version = "50.0.0", default-features = false } bigquery-storage = { git = "https://github.com/glaredb/bigquery-storage", branch = "deps/2023-10-27-update" } bitvec = "1" diff --git a/crates/datasources/src/common/sink/hive_partitioning.rs b/crates/datasources/src/common/sink/hive_partitioning.rs new file mode 100644 index 000000000..c69d133ed --- /dev/null +++ b/crates/datasources/src/common/sink/hive_partitioning.rs @@ -0,0 +1,194 @@ +use std::{any::Any, fmt, sync::Arc}; +use async_trait::async_trait; +use datafusion::arrow::datatypes::{DataType, Schema}; +use datafusion::common::Result as DfResult; +use datafusion::error::DataFusionError; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::{ + execution::TaskContext, + physical_plan::{ + insert::DataSink, metrics::MetricsSet, DisplayAs, DisplayFormatType, + SendableRecordBatchStream, + }, +}; +use object_store::path::Path as ObjectPath; +use tokio::task::JoinSet; + +use crate::common::sink::write::demux::start_demuxer_task; + +/// A data sink factory used to create a sink for a given path. +pub trait SinkProducer: std::fmt::Debug + Send + Sync { + fn create_sink(&self, path: ObjectPath) -> Box; +} + +/// A data sink that takes a stream of record batches and writes them to a hive-partitioned +/// directory structure. Delegating creation of underlying sinks to a `SinkProducer`. +#[derive(Debug)] +pub struct HivePartitionedSinkAdapter { + producer: S, + partition_columns: Vec<(String, DataType)>, + base_output_path: ObjectPath, + file_extension: String, + schema: Arc, +} + +impl HivePartitionedSinkAdapter { + pub fn new( + producer: S, + partition_columns: Vec<(String, DataType)>, + base_output_path: ObjectPath, + file_extension: String, + schema: Arc, + ) -> Self { + HivePartitionedSinkAdapter { + producer, + partition_columns, + base_output_path, + file_extension, + schema, + } + } + + fn get_writer_schema(&self) -> Arc { + if !self.partition_columns.is_empty() { + let schema = &self.schema; + let partition_names: Vec<_> = self + + .partition_columns + .iter() + .map(|(s, _)| s) + .collect(); + + let filtered_schema = Arc::new(Schema::new( + schema + .fields() + .iter() + .filter(|f| !partition_names.contains(&f.name())) + .map(|f| (**f).clone()) + .collect::>(), + )); + + filtered_schema + } else { + self.schema.clone() + } + } +} + +impl fmt::Display for HivePartitionedSinkAdapter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: display more details + write!(f, "SinkPartitioner()") + } +} + +impl DisplayAs for HivePartitionedSinkAdapter { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default => write!(f, "{self}"), + DisplayFormatType::Verbose => write!(f, "{self}"), + } + } +} + +#[async_trait] +impl DataSink for HivePartitionedSinkAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn metrics(&self) -> Option { + None + } + + async fn write_all( + &self, + stream: SendableRecordBatchStream, + context: &Arc, + ) -> DfResult { + if self.partition_columns.is_empty() { + let sink = self.producer.create_sink(self.base_output_path.clone()); + return sink.write_all(stream, context).await; + } + + let (demux_task, mut file_stream_rx) = start_demuxer_task( + stream, + context, + Some(self.partition_columns.clone()), + self.base_output_path.clone(), + self.file_extension.clone(), + ); + + let mut sink_write_tasks: JoinSet> = JoinSet::new(); + let writer_schema = self.get_writer_schema(); + + while let Some((path, mut rx)) = file_stream_rx.recv().await { + let ctx = context.clone(); + let sink = self.producer.create_sink(path); + + let stream = async_stream::stream! { + while let Some(item) = rx.recv().await { + yield Ok(item); + } + }; + + let record_batch_stream = + Box::pin(RecordBatchStreamAdapter::new(writer_schema.clone(), stream)); + + sink_write_tasks.spawn(async move { + sink.write_all(record_batch_stream, &ctx) + .await + .map(|row_count| row_count as usize) + }); + } + + let mut row_count = 0; + + while let Some(result) = sink_write_tasks.join_next().await { + match result { + Ok(r) => { + row_count += r?; + } + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } + } + } + } + + match demux_task.await { + Ok(r) => r?, + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } + } + } + + Ok(row_count as u64) + } +} + +/// Get the partition columns with their types from the schema. +pub fn get_partition_columns_with_types( + schema: &Schema, + columns: Vec, +) -> DfResult> { + let partition_columns: DfResult> = columns + .iter() + .map(|col| { + schema + .field_with_name(col) + .map(|field| (field.name().to_owned(), field.data_type().to_owned())) + .map_err(|e| DataFusionError::External(Box::new(e))) + }) + .collect(); + + partition_columns +} + diff --git a/crates/datasources/src/common/sink/mod.rs b/crates/datasources/src/common/sink/mod.rs index 9fd407420..0311cbd8f 100644 --- a/crates/datasources/src/common/sink/mod.rs +++ b/crates/datasources/src/common/sink/mod.rs @@ -1,4 +1,5 @@ mod write; +pub mod hive_partitioning; pub mod bson; pub mod csv; diff --git a/crates/datasources/src/common/sink/parquet.rs b/crates/datasources/src/common/sink/parquet.rs index 75be04491..9229effc8 100644 --- a/crates/datasources/src/common/sink/parquet.rs +++ b/crates/datasources/src/common/sink/parquet.rs @@ -2,11 +2,9 @@ use std::any::Any; use std::fmt; use std::sync::Arc; -use arrow_schema::Schema; +use super::hive_partitioning::SinkProducer; use async_trait::async_trait; -use datafusion::arrow::datatypes::DataType; use datafusion::common::Result as DfResult; -use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::error::DataFusionError; use datafusion::execution::TaskContext; use datafusion::parquet::arrow::AsyncArrowWriter; @@ -17,21 +15,21 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType, SendableRecordBatc use futures::StreamExt; use object_store::path::Path as ObjectPath; use object_store::ObjectStore; -use tokio::io::AsyncWrite; -use tokio::task::JoinSet; - -use crate::common::sink::write::parquet::{output_single_parquet_file_parallelized, ParallelParquetWriterOptions}; -use crate::common::sink::write::{AbortableWrite, MultiPart}; - -use super::write::demux::start_demuxer_task; const BUFFER_SIZE: usize = 8 * 1024 * 1024; + #[derive(Debug, Clone)] pub struct ParquetSinkOpts { pub row_group_size: usize, - pub partition_columns: Vec<(String, DataType)>, - pub output_schema: Arc, +} + +impl Default for ParquetSinkOpts { + fn default() -> Self { + ParquetSinkOpts { + row_group_size: 122880, + } + } } /// Writes parquet files to object storage. @@ -95,123 +93,6 @@ impl ParquetSink { Ok(stats.num_rows as usize) } - - async fn stream_into_partitioned_inner( - &self, - stream: SendableRecordBatchStream, - context: &Arc, - ) -> DfResult { - let (demux_task, mut file_stream_rx) = start_demuxer_task(stream, context, Some(self.opts.partition_columns.clone()), self.loc.clone(), "parquet".into()); - - let mut file_write_tasks: JoinSet> = - JoinSet::new(); - - async fn create_writer(object_store: Arc, location: &ObjectPath) -> DfResult>> { - let (multipart_id, writer) = object_store - .put_multipart(location) - .await - .map_err(DataFusionError::ObjectStore)?; - - Ok(AbortableWrite::new( - // Parquet files as a whole are never compressed, since they - // manage compressed blocks themselves. - FileCompressionType::UNCOMPRESSED.convert_async_writer(writer)?, - MultiPart::new(object_store, multipart_id, location.clone()), - )) - } - - while let Some((path, rx)) = file_stream_rx.recv().await { - let writer = create_writer( - self.store.clone(), - &path, - ) - .await?; - - let schema = self.get_writer_schema(); - let props = WriterProperties::builder() - .set_created_by("GlareDB".to_string()) - .set_max_row_group_size(self.opts.row_group_size) - .build(); - - // TODO: expose this somewhere - let parallel_options_clone = ParallelParquetWriterOptions { - max_parallel_row_groups: 1, - max_buffered_record_batches_per_stream: 2, - }; - - file_write_tasks.spawn(async move { - output_single_parquet_file_parallelized( - writer, - rx, - schema, - &props, - parallel_options_clone, - ) - .await - }); - } - - let mut row_count = 0; - while let Some(result) = file_write_tasks.join_next().await { - match result { - Ok(r) => { - row_count += r?; - } - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } else { - unreachable!(); - } - } - } - } - - match demux_task.await { - Ok(r) => r?, - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } else { - unreachable!(); - } - } - } - - Ok(row_count) - } - - /// Converts table schema to writer schema, which may differ in the case - /// of hive style partitioning where some columns are removed from the - /// underlying files. - fn get_writer_schema(&self) -> Arc { - if !self.opts.partition_columns.is_empty() { - let schema = &self.opts.output_schema; - let partition_names: Vec<_> = self - .opts - .partition_columns - .iter() - .map(|(s, _)| s) - .collect(); - - println!("partition_names: {:?}", partition_names); - - let filtered_schema = Arc::new(Schema::new( - schema - .fields() - .iter() - .filter(|f| !partition_names.contains(&f.name())) - .map(|f| (**f).clone()) - .collect::>(), - )); - - println!("filtered_schema: {:?}", filtered_schema); - - filtered_schema - } else { - self.opts.output_schema.clone() - } - } } #[async_trait] @@ -227,13 +108,30 @@ impl DataSink for ParquetSink { async fn write_all( &self, data: SendableRecordBatchStream, - context: &Arc, + _context: &Arc, ) -> DfResult { - match self.opts.partition_columns.is_empty() { - true => self.stream_into_inner(data).await.map(|x| x as u64), - false => self.stream_into_partitioned_inner(data, context).await.map(|x| x as u64), - } + self.stream_into_inner(data).await.map(|x| x as u64) } } +#[derive(Debug)] +pub struct ParquetSinkProducer { + store: Arc, + opts: ParquetSinkOpts, +} + +impl ParquetSinkProducer { + pub fn from_obj_store(store: Arc, opts: ParquetSinkOpts) -> Self { + ParquetSinkProducer { store, opts } + } + + pub fn create_sink(&self, loc: impl Into) -> ParquetSink { + ParquetSink::from_obj_store(self.store.clone(), loc, self.opts.clone()) + } +} +impl SinkProducer for ParquetSinkProducer { + fn create_sink(&self, loc: ObjectPath) -> Box { + Box::new(self.create_sink(loc)) + } +} diff --git a/crates/datasources/src/common/sink/write/demux.rs b/crates/datasources/src/common/sink/write/demux.rs index 8b4167799..85536a944 100644 --- a/crates/datasources/src/common/sink/write/demux.rs +++ b/crates/datasources/src/common/sink/write/demux.rs @@ -30,7 +30,7 @@ use std::collections::HashMap; use std::sync::Arc; -use arrow_array::StringArray; +use datafusion::arrow::array::StringArray; use datafusion::arrow::array::AsArray; use datafusion::arrow::array::{StructArray, UInt64Builder}; use datafusion::arrow::compute::take; diff --git a/crates/datasources/src/common/sink/write/mod.rs b/crates/datasources/src/common/sink/write/mod.rs index 14a61be01..ee6398726 100644 --- a/crates/datasources/src/common/sink/write/mod.rs +++ b/crates/datasources/src/common/sink/write/mod.rs @@ -1,177 +1 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Module containing helper methods/traits related to enabling -//! write support for the various file formats -//! -//! -- NOTE -- -//! This code was originally sourced from: -//! Repo: https://github.com/apache/arrow-datafusion -//! Commit: ae882356171513c9d6c22b3bd966898fb4e8cac0 -//! Path: datafusion/core/src/datasource/file_format/write/mod.rs -//! Date: 10 Feb 2024 - -use std::io::{Error, Write}; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - - -use datafusion::datasource::file_format::file_compression_type::FileCompressionType; -use datafusion::error::{DataFusionError, Result}; - -use arrow_array::RecordBatch; - - -use bytes::Bytes; -use futures::future::BoxFuture; -use object_store::path::Path; -use object_store::{MultipartId, ObjectStore}; -use tokio::io::AsyncWrite; - -pub(crate) mod demux; -pub(crate) mod parquet; -// pub(crate) mod orchestration; - -/// A buffer with interior mutability shared by the SerializedFileWriter and -/// ObjectStore writer -#[derive(Clone)] -pub(crate) struct SharedBuffer { - /// The inner buffer for reading and writing - /// - /// The lock is used to obtain internal mutability, so no worry about the - /// lock contention. - pub(crate) buffer: Arc>>, -} - -impl SharedBuffer { - pub fn new(capacity: usize) -> Self { - Self { - buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))), - } - } -} - -impl Write for SharedBuffer { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let mut buffer = self.buffer.try_lock().unwrap(); - Write::write(&mut *buffer, buf) - } - - fn flush(&mut self) -> std::io::Result<()> { - let mut buffer = self.buffer.try_lock().unwrap(); - Write::flush(&mut *buffer) - } -} - -/// Stores data needed during abortion of MultiPart writers -#[derive(Clone)] -pub(crate) struct MultiPart { - /// A shared reference to the object store - store: Arc, - multipart_id: MultipartId, - location: Path, -} - -impl MultiPart { - /// Create a new `MultiPart` - pub fn new( - store: Arc, - multipart_id: MultipartId, - location: Path, - ) -> Self { - Self { - store, - multipart_id, - location, - } - } -} - -/// A wrapper struct with abort method and writer -pub(crate) struct AbortableWrite { - writer: W, - multipart: MultiPart, -} - -impl AbortableWrite { - /// Create a new `AbortableWrite` instance with the given writer, and write mode. - pub(crate) fn new(writer: W, multipart: MultiPart) -> Self { - Self { writer, multipart } - } - - /// handling of abort for different write modes - pub(crate) fn abort_writer(&self) -> Result>> { - let multi = self.multipart.clone(); - Ok(Box::pin(async move { - multi - .store - .abort_multipart(&multi.location, &multi.multipart_id) - .await - .map_err(DataFusionError::ObjectStore) - })) - } -} - -impl AsyncWrite for AbortableWrite { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_write(cx, buf) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_shutdown(cx) - } -} - -/// A trait that defines the methods required for a RecordBatch serializer. -pub trait BatchSerializer: Sync + Send { - /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. - /// Parameter `initial` signals whether the given batch is the first batch. - /// This distinction is important for certain serializers (like CSV). - fn serialize(&self, batch: RecordBatch, initial: bool) -> Result; -} - -/// Returns an [`AbortableWrite`] which writes to the given object store location -/// with the specified compression -pub(crate) async fn create_writer( - file_compression_type: FileCompressionType, - location: &Path, - object_store: Arc, -) -> Result>> { - let (multipart_id, writer) = object_store - .put_multipart(location) - .await - .map_err(DataFusionError::ObjectStore)?; - Ok(AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - MultiPart::new(object_store, multipart_id, location.clone()), - )) -} +pub(crate) mod demux; \ No newline at end of file diff --git a/crates/datasources/src/common/sink/write/parquet.rs b/crates/datasources/src/common/sink/write/parquet.rs deleted file mode 100644 index 2a52c2969..000000000 --- a/crates/datasources/src/common/sink/write/parquet.rs +++ /dev/null @@ -1,330 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -use std::{result, sync::Arc}; - -use arrow_array::RecordBatch; -use arrow_schema::Schema; -use datafusion::{error::DataFusionError, parquet::{arrow::{arrow_to_parquet_schema, arrow_writer::{compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn}}, file::{properties::WriterProperties, writer::SerializedFileWriter}}}; -use tokio::{io::AsyncWrite, sync::mpsc::{self, Receiver, Sender}, task::JoinHandle}; -use tokio::io::AsyncWriteExt; -use super::{AbortableWrite, SharedBuffer}; - -/// Size of the buffer for [`AsyncArrowWriter`]. -const PARQUET_WRITER_BUFFER_SIZE: usize = 10485760; - -/// Initial writing buffer size. Note this is just a size hint for efficiency. It -/// will grow beyond the set value if needed. -const INITIAL_BUFFER_BYTES: usize = 1048576; - -/// When writing parquet files in parallel, if the buffered Parquet data exceeds -/// this size, it is flushed to object store -const BUFFER_FLUSH_BYTES: usize = 1024000; - -/// Result type for operations that could result in an [DataFusionError] -type Result = result::Result; - -/// Consumes a stream of [ArrowLeafColumn] via a channel and serializes them using an [ArrowColumnWriter] -/// Once the channel is exhausted, returns the ArrowColumnWriter. -async fn column_serializer_task( - mut rx: Receiver, - mut writer: ArrowColumnWriter, -) -> Result { - while let Some(col) = rx.recv().await { - writer.write(&col)?; - } - Ok(writer) -} - -type ColumnJoinHandle = JoinHandle>; -type ColSender = Sender; -/// Spawns a parallel serialization task for each column -/// Returns join handles for each columns serialization task along with a send channel -/// to send arrow arrays to each serialization task. -fn spawn_column_parallel_row_group_writer( - schema: Arc, - parquet_props: Arc, - max_buffer_size: usize, -) -> Result<(Vec, Vec)> { - let schema_desc = arrow_to_parquet_schema(&schema)?; - let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?; - let num_columns = col_writers.len(); - - let mut col_writer_handles = Vec::with_capacity(num_columns); - let mut col_array_channels = Vec::with_capacity(num_columns); - for writer in col_writers.into_iter() { - // Buffer size of this channel limits the number of arrays queued up for column level serialization - let (send_array, recieve_array) = - mpsc::channel::(max_buffer_size); - col_array_channels.push(send_array); - col_writer_handles - .push(tokio::spawn(column_serializer_task(recieve_array, writer))) - } - - Ok((col_writer_handles, col_array_channels)) -} - -/// Settings related to writing parquet files in parallel -#[derive(Clone)] -pub struct ParallelParquetWriterOptions { - pub max_parallel_row_groups: usize, - pub max_buffered_record_batches_per_stream: usize, -} - -/// This is the return type of calling [ArrowColumnWriter].close() on each column -/// i.e. the Vec of encoded columns which can be appended to a row group -type RBStreamSerializeResult = Result<(Vec, usize)>; - -/// Sends the ArrowArrays in passed [RecordBatch] through the channels to their respective -/// parallel column serializers. -async fn send_arrays_to_col_writers( - col_array_channels: &[ColSender], - rb: &RecordBatch, - schema: Arc, -) -> Result<()> { - // Each leaf column has its own channel, increment next_channel for each leaf column sent. - let mut next_channel = 0; - for (array, field) in rb.columns().iter().zip(schema.fields()) { - for c in compute_leaves(field, array)? { - col_array_channels[next_channel] - .send(c) - .await - .map_err(|_| { - DataFusionError::Internal("Unable to send array to writer!".into()) - })?; - next_channel += 1; - } - } - - Ok(()) -} - -/// Spawns a tokio task which joins the parallel column writer tasks, -/// and finalizes the row group -fn spawn_rg_join_and_finalize_task( - column_writer_handles: Vec>>, - rg_rows: usize, -) -> JoinHandle { - tokio::spawn(async move { - let num_cols = column_writer_handles.len(); - let mut finalized_rg = Vec::with_capacity(num_cols); - for handle in column_writer_handles.into_iter() { - match handle.await { - Ok(r) => { - let w = r?; - finalized_rg.push(w.close()?); - } - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()) - } else { - unreachable!() - } - } - } - } - - Ok((finalized_rg, rg_rows)) - }) -} - -/// This task coordinates the serialization of a parquet file in parallel. -/// As the query produces RecordBatches, these are written to a RowGroup -/// via parallel [ArrowColumnWriter] tasks. Once the desired max rows per -/// row group is reached, the parallel tasks are joined on another separate task -/// and sent to a concatenation task. This task immediately continues to work -/// on the next row group in parallel. So, parquet serialization is parallelized -/// accross both columns and row_groups, with a theoretical max number of parallel tasks -/// given by n_columns * num_row_groups. -fn spawn_parquet_parallel_serialization_task( - mut data: Receiver, - serialize_tx: Sender>, - schema: Arc, - writer_props: Arc, - parallel_options: ParallelParquetWriterOptions, -) -> JoinHandle> { - tokio::spawn(async move { - let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream; - let max_row_group_rows = writer_props.max_row_group_size(); - let (mut column_writer_handles, mut col_array_channels) = - spawn_column_parallel_row_group_writer( - schema.clone(), - writer_props.clone(), - max_buffer_rb, - )?; - let mut current_rg_rows = 0; - - while let Some(rb) = data.recv().await { - if current_rg_rows + rb.num_rows() < max_row_group_rows { - send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone()) - .await?; - current_rg_rows += rb.num_rows(); - } else { - let rows_left = max_row_group_rows - current_rg_rows; - let a = rb.slice(0, rows_left); - send_arrays_to_col_writers(&col_array_channels, &a, schema.clone()) - .await?; - - // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup - // on a separate task, so that we can immediately start on the next RG before waiting - // for the current one to finish. - drop(col_array_channels); - let finalize_rg_task = spawn_rg_join_and_finalize_task( - column_writer_handles, - max_row_group_rows, - ); - - serialize_tx.send(finalize_rg_task).await.map_err(|_| { - DataFusionError::Internal( - "Unable to send closed RG to concat task!".into(), - ) - })?; - - let b = rb.slice(rows_left, rb.num_rows() - rows_left); - (column_writer_handles, col_array_channels) = - spawn_column_parallel_row_group_writer( - schema.clone(), - writer_props.clone(), - max_buffer_rb, - )?; - send_arrays_to_col_writers(&col_array_channels, &b, schema.clone()) - .await?; - current_rg_rows = b.num_rows(); - } - } - - drop(col_array_channels); - // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows - if current_rg_rows > 0 { - let finalize_rg_task = - spawn_rg_join_and_finalize_task(column_writer_handles, current_rg_rows); - - serialize_tx.send(finalize_rg_task).await.map_err(|_| { - DataFusionError::Internal( - "Unable to send closed RG to concat task!".into(), - ) - })?; - } - - Ok(()) - }) -} - -/// Consume RowGroups serialized by other parallel tasks and concatenate them in -/// to the final parquet file, while flushing finalized bytes to an [ObjectStore] -async fn concatenate_parallel_row_groups( - mut serialize_rx: Receiver>, - schema: Arc, - writer_props: Arc, - mut object_store_writer: AbortableWrite>, -) -> Result { - let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES); - - let schema_desc = arrow_to_parquet_schema(schema.as_ref())?; - let mut parquet_writer = SerializedFileWriter::new( - merged_buff.clone(), - schema_desc.root_schema_ptr(), - writer_props, - )?; - - let mut row_count = 0; - - while let Some(handle) = serialize_rx.recv().await { - let join_result = handle.await; - match join_result { - Ok(result) => { - let mut rg_out = parquet_writer.next_row_group()?; - let (serialized_columns, cnt) = result?; - row_count += cnt; - for chunk in serialized_columns { - chunk.append_to_row_group(&mut rg_out)?; - let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); - if buff_to_flush.len() > BUFFER_FLUSH_BYTES { - object_store_writer - .write_all(buff_to_flush.as_slice()) - .await?; - buff_to_flush.clear(); - } - } - rg_out.close()?; - } - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } else { - unreachable!(); - } - } - } - } - - let inner_writer = parquet_writer.into_inner()?; - let final_buff = inner_writer.buffer.try_lock().unwrap(); - - object_store_writer.write_all(final_buff.as_slice()).await?; - object_store_writer.shutdown().await?; - - Ok(row_count) -} - -/// Parallelizes the serialization of a single parquet file, by first serializing N -/// independent RecordBatch streams in parallel to RowGroups in memory. Another -/// task then stitches these independent RowGroups together and streams this large -/// single parquet file to an ObjectStore in multiple parts. -pub async fn output_single_parquet_file_parallelized( - object_store_writer: AbortableWrite>, - data: Receiver, - output_schema: Arc, - parquet_props: &WriterProperties, - parallel_options: ParallelParquetWriterOptions, -) -> Result { - let max_rowgroups = parallel_options.max_parallel_row_groups; - // Buffer size of this channel limits maximum number of RowGroups being worked on in parallel - let (serialize_tx, serialize_rx) = - mpsc::channel::>(max_rowgroups); - - let arc_props = Arc::new(parquet_props.clone()); - let launch_serialization_task = spawn_parquet_parallel_serialization_task( - data, - serialize_tx, - output_schema.clone(), - arc_props.clone(), - parallel_options, - ); - let row_count = concatenate_parallel_row_groups( - serialize_rx, - output_schema.clone(), - arc_props.clone(), - object_store_writer, - ) - .await?; - - match launch_serialization_task.await { - Ok(Ok(_)) => (), - Ok(Err(e)) => return Err(e), - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()) - } else { - unreachable!() - } - } - }; - - Ok(row_count) -} diff --git a/crates/sqlexec/src/planner/physical_plan/copy_to.rs b/crates/sqlexec/src/planner/physical_plan/copy_to.rs index 635c3ffbd..3729ffc10 100644 --- a/crates/sqlexec/src/planner/physical_plan/copy_to.rs +++ b/crates/sqlexec/src/planner/physical_plan/copy_to.rs @@ -19,7 +19,7 @@ use datasources::common::sink::bson::BsonSink; use datasources::common::sink::csv::{CsvSink, CsvSinkOpts}; use datasources::common::sink::json::{JsonSink, JsonSinkOpts}; use datasources::common::sink::lance::{LanceSink, LanceSinkOpts, LanceWriteParams}; -use datasources::common::sink::parquet::{ParquetSink, ParquetSinkOpts}; +use datasources::common::sink::parquet::{ParquetSinkProducer, ParquetSinkOpts}; use datasources::common::url::DatasourceUrl; use datasources::object_store::gcs::GcsStoreAccess; use datasources::object_store::generic::GenericStoreAccess; @@ -31,6 +31,7 @@ use object_store::azure::AzureConfigKey; use protogen::metastore::types::options::{ CopyToDestinationOptions, CopyToFormatOptions, StorageOptions, }; +use datasources::common::sink::hive_partitioning::{get_partition_columns_with_types, HivePartitionedSinkAdapter}; use super::{new_operation_with_count_batch, GENERIC_OPERATION_AND_COUNT_PHYSICAL_SCHEMA}; @@ -211,15 +212,18 @@ fn get_sink_for_obj( header: csv_opts.header, }, )), - CopyToFormatOptions::Parquet(parquet_opts) => Box::new(ParquetSink::from_obj_store( - store, - path, - ParquetSinkOpts { - row_group_size: parquet_opts.row_group_size, - partition_columns: get_partition_columns_with_types(source, parquet_opts.partition_columns)?, - output_schema: source.schema().clone(), - }, - )), + CopyToFormatOptions::Parquet(parquet_opts) => { + let schema = source.schema(); + Box::new(HivePartitionedSinkAdapter::new( + ParquetSinkProducer::from_obj_store(store, ParquetSinkOpts { + row_group_size: parquet_opts.row_group_size, + }), + get_partition_columns_with_types(&schema, parquet_opts.partition_columns)?, + location.into(), + ".parquet".to_string(), + schema, + )) + }, CopyToFormatOptions::Lance(opts) => { let wp = LanceWriteParams::default(); @@ -255,20 +259,3 @@ fn get_sink_for_obj( Ok(sink) } -fn get_partition_columns_with_types( - source: &Arc, - columns: Vec, -) -> DataFusionResult> { - let partition_columns: DataFusionResult> = columns - .iter() - .map(|col| { - source - .schema() - .field_with_name(col) - .map(|field| (field.name().to_owned(), field.data_type().to_owned())) - .map_err(|e| DataFusionError::External(Box::new(e))) - }) - .collect(); - - partition_columns -}