Skip to content

Commit

Permalink
lift partitioning above file format
Browse files Browse the repository at this point in the history
  • Loading branch information
melbourne2991 committed Feb 16, 2024
1 parent 4d4073a commit 8317fbc
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 671 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/datasources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ apache-avro = "0.16"
async-channel = "2.1.1"
async-stream = "0.3.5"
async-trait = { workspace = true }
arrow-array = { version = "50.0.0", default-features = false }
arrow-schema = { version = "50.0.0", default-features = false }
bigquery-storage = { git = "https://github.com/glaredb/bigquery-storage", branch = "deps/2023-10-27-update" }
bitvec = "1"
Expand Down
194 changes: 194 additions & 0 deletions crates/datasources/src/common/sink/hive_partitioning.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
use std::{any::Any, fmt, sync::Arc};
use async_trait::async_trait;
use datafusion::arrow::datatypes::{DataType, Schema};
use datafusion::common::Result as DfResult;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::{
execution::TaskContext,
physical_plan::{
insert::DataSink, metrics::MetricsSet, DisplayAs, DisplayFormatType,
SendableRecordBatchStream,
},
};
use object_store::path::Path as ObjectPath;
use tokio::task::JoinSet;

use crate::common::sink::write::demux::start_demuxer_task;

/// A data sink factory used to create a sink for a given path.
pub trait SinkProducer: std::fmt::Debug + Send + Sync {
fn create_sink(&self, path: ObjectPath) -> Box<dyn DataSink>;
}

/// A data sink that takes a stream of record batches and writes them to a hive-partitioned
/// directory structure. Delegating creation of underlying sinks to a `SinkProducer`.
#[derive(Debug)]
pub struct HivePartitionedSinkAdapter<S: SinkProducer> {
producer: S,
partition_columns: Vec<(String, DataType)>,
base_output_path: ObjectPath,
file_extension: String,
schema: Arc<Schema>,
}

impl <S: SinkProducer> HivePartitionedSinkAdapter<S> {
pub fn new(
producer: S,
partition_columns: Vec<(String, DataType)>,
base_output_path: ObjectPath,
file_extension: String,
schema: Arc<Schema>,
) -> Self {
HivePartitionedSinkAdapter {
producer,
partition_columns,
base_output_path,
file_extension,
schema,
}
}

fn get_writer_schema(&self) -> Arc<Schema> {
if !self.partition_columns.is_empty() {
let schema = &self.schema;
let partition_names: Vec<_> = self

.partition_columns
.iter()
.map(|(s, _)| s)
.collect();

let filtered_schema = Arc::new(Schema::new(
schema
.fields()
.iter()
.filter(|f| !partition_names.contains(&f.name()))
.map(|f| (**f).clone())
.collect::<Vec<_>>(),
));

filtered_schema
} else {
self.schema.clone()
}
}
}

impl <S: SinkProducer> fmt::Display for HivePartitionedSinkAdapter<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: display more details
write!(f, "SinkPartitioner()")
}
}

impl <S: SinkProducer> DisplayAs for HivePartitionedSinkAdapter<S> {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default => write!(f, "{self}"),
DisplayFormatType::Verbose => write!(f, "{self}"),
}
}
}

#[async_trait]
impl <S: SinkProducer + 'static> DataSink for HivePartitionedSinkAdapter<S> {
fn as_any(&self) -> &dyn Any {
self
}

fn metrics(&self) -> Option<MetricsSet> {
None
}

async fn write_all(
&self,
stream: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> DfResult<u64> {
if self.partition_columns.is_empty() {
let sink = self.producer.create_sink(self.base_output_path.clone());
return sink.write_all(stream, context).await;
}

let (demux_task, mut file_stream_rx) = start_demuxer_task(
stream,
context,
Some(self.partition_columns.clone()),
self.base_output_path.clone(),
self.file_extension.clone(),
);

let mut sink_write_tasks: JoinSet<DfResult<usize>> = JoinSet::new();
let writer_schema = self.get_writer_schema();

while let Some((path, mut rx)) = file_stream_rx.recv().await {
let ctx = context.clone();
let sink = self.producer.create_sink(path);

let stream = async_stream::stream! {
while let Some(item) = rx.recv().await {
yield Ok(item);
}
};

let record_batch_stream =
Box::pin(RecordBatchStreamAdapter::new(writer_schema.clone(), stream));

sink_write_tasks.spawn(async move {
sink.write_all(record_batch_stream, &ctx)
.await
.map(|row_count| row_count as usize)
});
}

let mut row_count = 0;

while let Some(result) = sink_write_tasks.join_next().await {
match result {
Ok(r) => {
row_count += r?;
}
Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
unreachable!();
}
}
}
}

match demux_task.await {
Ok(r) => r?,
Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
unreachable!();
}
}
}

Ok(row_count as u64)
}
}

/// Get the partition columns with their types from the schema.
pub fn get_partition_columns_with_types(
schema: &Schema,
columns: Vec<String>,
) -> DfResult<Vec<(String, DataType)>> {
let partition_columns: DfResult<Vec<(String, DataType)>> = columns
.iter()
.map(|col| {
schema
.field_with_name(col)
.map(|field| (field.name().to_owned(), field.data_type().to_owned()))
.map_err(|e| DataFusionError::External(Box::new(e)))
})
.collect();

partition_columns
}

1 change: 1 addition & 0 deletions crates/datasources/src/common/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod write;
pub mod hive_partitioning;

pub mod bson;
pub mod csv;
Expand Down
Loading

0 comments on commit 8317fbc

Please sign in to comment.