forked from delta-io/delta-rs
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: introduce CDC write-side support for the Update operations
This change introduces a `CDCTracker` which helps collect changes during merges and update. This is admittedly rather inefficient, but my hope is that this provides a place to start iterating and improving upon the writer code There is still additional work which needs to be done to handle table features properly for other code paths (see the middleware discussion we have had in Slack) but this produces CDC files for Update operations Fixes delta-io#604 Fixes delta-io#2095
- Loading branch information
Showing
9 changed files
with
675 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,316 @@ | ||
//! | ||
//! The CDC module contains private tools for managing CDC files | ||
//! | ||
|
||
use crate::DeltaResult; | ||
|
||
use arrow::array::{Array, StringArray}; | ||
use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; | ||
use arrow::record_batch::RecordBatch; | ||
use datafusion::error::Result as DataFusionResult; | ||
use datafusion::physical_plan::{ | ||
metrics::MetricsSet, DisplayAs, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, | ||
}; | ||
use datafusion::prelude::*; | ||
use futures::{Stream, StreamExt}; | ||
use std::sync::Arc; | ||
use tokio::sync::mpsc::*; | ||
use tracing::log::*; | ||
|
||
/// Maximum in-memory channel size for the tracker to use | ||
const MAX_CHANNEL_SIZE: usize = 1024; | ||
|
||
/// The CDCTracker is useful for hooking reads/writes in a manner nececessary to create CDC files | ||
/// associated with commits | ||
pub(crate) struct CDCTracker { | ||
schema: SchemaRef, | ||
pre_sender: Sender<RecordBatch>, | ||
pre_receiver: Receiver<RecordBatch>, | ||
post_sender: Sender<RecordBatch>, | ||
post_receiver: Receiver<RecordBatch>, | ||
} | ||
|
||
impl CDCTracker { | ||
/// construct | ||
pub(crate) fn new(schema: SchemaRef) -> Self { | ||
let (pre_sender, pre_receiver) = channel(MAX_CHANNEL_SIZE); | ||
let (post_sender, post_receiver) = channel(MAX_CHANNEL_SIZE); | ||
Self { | ||
schema, | ||
pre_sender, | ||
pre_receiver, | ||
post_sender, | ||
post_receiver, | ||
} | ||
} | ||
|
||
/// Return an owned [Sender] for the caller to use when sending read but not altered batches | ||
pub(crate) fn pre_sender(&self) -> Sender<RecordBatch> { | ||
self.pre_sender.clone() | ||
} | ||
|
||
/// Return an owned [Sender][ for the caller to use when sending altered batches | ||
pub(crate) fn post_sender(&self) -> Sender<RecordBatch> { | ||
self.post_sender.clone() | ||
} | ||
|
||
pub(crate) async fn collect(mut self) -> DeltaResult<Vec<RecordBatch>> { | ||
debug!("Collecting all the batches for diffing"); | ||
let ctx = SessionContext::new(); | ||
let mut pre = vec![]; | ||
let mut post = vec![]; | ||
|
||
while !self.pre_receiver.is_empty() { | ||
if let Ok(batch) = self.pre_receiver.try_recv() { | ||
pre.push(batch); | ||
} else { | ||
warn!("Error when receiving on the pre-receiver"); | ||
} | ||
} | ||
|
||
while !self.post_receiver.is_empty() { | ||
if let Ok(batch) = self.post_receiver.try_recv() { | ||
post.push(batch); | ||
} else { | ||
warn!("Error when receiving on the post-receiver"); | ||
} | ||
} | ||
|
||
// Collect _all_ the batches for consideration | ||
let pre = ctx.read_batches(pre)?; | ||
let post = ctx.read_batches(post)?; | ||
|
||
// There is certainly a better way to do this other than stupidly cloning data for diffing | ||
// purposes, but this is the quickest and easiest way to "diff" the two sets of batches | ||
let preimage = pre.clone().except(post.clone())?; | ||
let postimage = post.except(pre)?; | ||
|
||
// Create a new schema which represents the input batch along with the CDC | ||
// columns | ||
let mut fields: Vec<Arc<Field>> = self.schema.fields().to_vec().clone(); | ||
fields.push(Arc::new(Field::new("_change_type", DataType::Utf8, true))); | ||
let schema = Arc::new(Schema::new(fields)); | ||
|
||
let mut batches = vec![]; | ||
|
||
let mut pre_stream = preimage.execute_stream().await?; | ||
let mut post_stream = postimage.execute_stream().await?; | ||
|
||
// Fill up on pre image batches | ||
while let Some(Ok(batch)) = pre_stream.next().await { | ||
let batch = crate::operations::cast::cast_record_batch( | ||
&batch, | ||
self.schema.clone(), | ||
true, | ||
false, | ||
)?; | ||
let new_column = Arc::new(StringArray::from(vec![ | ||
Some("update_preimage"); | ||
batch.num_rows() | ||
])); | ||
let mut columns: Vec<Arc<dyn Array>> = batch.columns().to_vec(); | ||
columns.push(new_column); | ||
|
||
let batch = RecordBatch::try_new(schema.clone(), columns)?; | ||
batches.push(batch); | ||
} | ||
|
||
// Fill up on the post-image batches | ||
while let Some(Ok(batch)) = post_stream.next().await { | ||
let batch = crate::operations::cast::cast_record_batch( | ||
&batch, | ||
self.schema.clone(), | ||
true, | ||
false, | ||
)?; | ||
let new_column = Arc::new(StringArray::from(vec![ | ||
Some("update_postimage"); | ||
batch.num_rows() | ||
])); | ||
let mut columns: Vec<Arc<dyn Array>> = batch.columns().to_vec(); | ||
columns.push(new_column); | ||
|
||
let batch = RecordBatch::try_new(schema.clone(), columns)?; | ||
batches.push(batch); | ||
} | ||
|
||
debug!("Found {} batches to consider `CDC` data", batches.len()); | ||
|
||
// At this point the batches should just contain the changes | ||
Ok(batches) | ||
} | ||
} | ||
|
||
/// A DataFusion observer to help pick up on pre-image changes | ||
pub(crate) struct CDCObserver { | ||
parent: Arc<dyn ExecutionPlan>, | ||
id: String, | ||
sender: Sender<RecordBatch>, | ||
} | ||
|
||
impl CDCObserver { | ||
pub(crate) fn new( | ||
id: String, | ||
sender: Sender<RecordBatch>, | ||
parent: Arc<dyn ExecutionPlan>, | ||
) -> Self { | ||
Self { id, sender, parent } | ||
} | ||
} | ||
|
||
impl std::fmt::Debug for CDCObserver { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.debug_struct("CDCObserver").field("id", &self.id).finish() | ||
} | ||
} | ||
|
||
impl DisplayAs for CDCObserver { | ||
fn fmt_as( | ||
&self, | ||
_: datafusion::physical_plan::DisplayFormatType, | ||
f: &mut std::fmt::Formatter, | ||
) -> std::fmt::Result { | ||
write!(f, "CDCObserver id={}", self.id) | ||
} | ||
} | ||
|
||
impl ExecutionPlan for CDCObserver { | ||
fn as_any(&self) -> &dyn std::any::Any { | ||
self | ||
} | ||
|
||
fn schema(&self) -> SchemaRef { | ||
self.parent.schema() | ||
} | ||
|
||
fn properties(&self) -> &datafusion::physical_plan::PlanProperties { | ||
self.parent.properties() | ||
} | ||
|
||
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { | ||
vec![self.parent.clone()] | ||
} | ||
|
||
fn execute( | ||
&self, | ||
partition: usize, | ||
context: Arc<datafusion::execution::context::TaskContext>, | ||
) -> datafusion_common::Result<datafusion::physical_plan::SendableRecordBatchStream> { | ||
let res = self.parent.execute(partition, context)?; | ||
Ok(Box::pin(CDCObserverStream { | ||
schema: self.schema(), | ||
input: res, | ||
sender: self.sender.clone(), | ||
})) | ||
} | ||
|
||
fn statistics(&self) -> DataFusionResult<datafusion_common::Statistics> { | ||
self.parent.statistics() | ||
} | ||
|
||
fn with_new_children( | ||
self: Arc<Self>, | ||
children: Vec<Arc<dyn ExecutionPlan>>, | ||
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> { | ||
if let Some(parent) = children.first() { | ||
Ok(Arc::new(CDCObserver { | ||
id: self.id.clone(), | ||
sender: self.sender.clone(), | ||
parent: parent.clone(), | ||
})) | ||
} else { | ||
Err(datafusion_common::DataFusionError::Internal( | ||
"Failed to handle CDCObserver".into(), | ||
)) | ||
} | ||
} | ||
|
||
fn metrics(&self) -> Option<MetricsSet> { | ||
self.parent.metrics() | ||
} | ||
} | ||
|
||
/// The CDCObserverStream simply acts to help observe the stream of data being | ||
/// read by DataFusion to capture the pre-image versions of data | ||
pub(crate) struct CDCObserverStream { | ||
schema: SchemaRef, | ||
input: SendableRecordBatchStream, | ||
sender: Sender<RecordBatch>, | ||
} | ||
|
||
impl Stream for CDCObserverStream { | ||
type Item = DataFusionResult<RecordBatch>; | ||
|
||
fn poll_next( | ||
mut self: std::pin::Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Option<Self::Item>> { | ||
self.input.poll_next_unpin(cx).map(|x| match x { | ||
Some(Ok(batch)) => { | ||
let _ = self.sender.try_send(batch.clone()); | ||
Some(Ok(batch)) | ||
} | ||
other => other, | ||
}) | ||
} | ||
|
||
fn size_hint(&self) -> (usize, Option<usize>) { | ||
self.input.size_hint() | ||
} | ||
} | ||
|
||
impl RecordBatchStream for CDCObserverStream { | ||
fn schema(&self) -> SchemaRef { | ||
self.schema.clone() | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use arrow::array::Int32Array; | ||
use datafusion::assert_batches_sorted_eq; | ||
|
||
#[tokio::test] | ||
async fn test_sanity_check() { | ||
let schema = Arc::new(Schema::new(vec![Field::new( | ||
"value", | ||
DataType::Int32, | ||
true, | ||
)])); | ||
let tracker = CDCTracker::new(schema.clone()); | ||
|
||
let batch = RecordBatch::try_new( | ||
Arc::clone(&schema), | ||
vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))], | ||
) | ||
.unwrap(); | ||
let updated_batch = RecordBatch::try_new( | ||
Arc::clone(&schema), | ||
vec![Arc::new(Int32Array::from(vec![Some(1), Some(12), Some(3)]))], | ||
) | ||
.unwrap(); | ||
|
||
let _ = tracker.pre_sender().send(batch).await; | ||
let _ = tracker.post_sender().send(updated_batch).await; | ||
|
||
match tracker.collect().await { | ||
Ok(batches) => { | ||
let _ = arrow::util::pretty::print_batches(&batches); | ||
assert_eq!(batches.len(), 2); | ||
assert_batches_sorted_eq! {[ | ||
"+-------+------------------+", | ||
"| value | _change_type |", | ||
"+-------+------------------+", | ||
"| 2 | update_preimage |", | ||
"| 12 | update_postimage |", | ||
"+-------+------------------+", | ||
], &batches } | ||
} | ||
Err(err) => { | ||
println!("err: {err:#?}"); | ||
panic!("Should have never reached this assertion"); | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -174,6 +174,7 @@ async fn excute_non_empty_expr( | |
false, | ||
None, | ||
writer_stats_config, | ||
None, | ||
) | ||
.await? | ||
.into_iter() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1389,6 +1389,7 @@ async fn execute( | |
safe_cast, | ||
None, | ||
writer_stats_config, | ||
None, | ||
) | ||
.await?; | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.