Skip to content

Commit

Permalink
wip: window udwf is working
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Aug 21, 2024
1 parent 4876fea commit c709df4
Show file tree
Hide file tree
Showing 5 changed files with 341 additions and 99 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions olap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ signal-hook = "0.3.17"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
tokio = { workspace = true, features = ["fs", "rt-multi-thread"] }
tracing.workspace = true

[dev-dependencies]
test-log.workspace = true
104 changes: 60 additions & 44 deletions olap/src/aggregator/ceramic_car.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use std::{any::Any, sync::Arc};

use arrow::{
array::{Array as _, BinaryBuilder, BooleanBufferBuilder, ListBuilder, StructArray},
array::{Array as _, BinaryBuilder, BooleanBufferBuilder, StringBuilder, StructArray},
datatypes::{DataType, Field, Fields},
};
use ceramic_event::unvalidated;
use cid::Cid;
use datafusion::{
common::cast::as_binary_array,
error::DataFusionError,
common::{cast::as_binary_array, exec_datafusion_err},
logical_expr::{ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility},
};
use serde_json::Value;
use tracing::debug;
use tracing::{debug, instrument, Level};

/// UDF that extracts the car event data.
#[derive(Debug)]
Expand All @@ -29,31 +28,44 @@ impl CeramicCar {
Volatility::Immutable,
),
return_fields: vec![
Field::new("stream_cid", DataType::Binary, false),
Field::new("event_cid", DataType::Binary, false),
Field::new(
"multi_prev",
DataType::List(Field::new_list_field(DataType::Binary, true).into()),
false,
),
Field::new("payload", DataType::Binary, false),
Field::new("previous", DataType::Binary, true),
Field::new("data", DataType::Utf8, true),
]
.into(),
}
}
fn extract(car: &[u8]) -> anyhow::Result<Option<Commit>> {
#[instrument(skip(car), ret(level = Level::TRACE))]
fn extract(car: &[u8]) -> anyhow::Result<Commit> {
let (cid, event) = unvalidated::Event::<Value>::decode_car(car, false)?;
debug!(?event, "extract");
match event {
unvalidated::Event::Time(_time) => Ok(None),
unvalidated::Event::Time(time) => Ok(Commit {
stream_cid: time.id(),
cid,
previous: (Some(time.prev())),
data: None,
}),
unvalidated::Event::Signed(signed) => match signed.payload() {
unvalidated::Payload::Data(data) => Ok(Some(Commit {
unvalidated::Payload::Data(data) => Ok(Commit {
stream_cid: *data.id(),
cid,
prevs: (vec![*data.prev()]),
payload: (data.data().clone()),
})),
unvalidated::Payload::Init(_init) => Ok(None),
previous: (Some(*data.prev())),
data: Some(data.data().clone()),
}),
unvalidated::Payload::Init(init) => Ok(Commit {
stream_cid: cid,
cid,
previous: None,
data: init.data().cloned(),
}),
},
unvalidated::Event::Unsigned(_init) => Ok(None),
unvalidated::Event::Unsigned(init) => Ok(Commit {
stream_cid: cid,
cid,
previous: None,
data: init.data().cloned(),
}),
}
}
}
Expand All @@ -74,33 +86,35 @@ impl ScalarUDFImpl for CeramicCar {
fn invoke(&self, args: &[ColumnarValue]) -> datafusion::common::Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let cars = as_binary_array(&args[0])?;
let mut cids = BinaryBuilder::new();
let mut multi_prevs = ListBuilder::new(BinaryBuilder::new());
let mut payloads = BinaryBuilder::new();
let mut stream_cids = BinaryBuilder::new();
let mut event_cids = BinaryBuilder::new();
let mut prevs = BinaryBuilder::new();
let mut datas = StringBuilder::new();
let mut nulls = BooleanBufferBuilder::new(cars.len());
debug!(len = cars.len(), "extracting cars");
for car in cars {
if let Some(car) = car {
if let Some(Commit {
let Commit {
stream_cid,
cid,
prevs,
payload,
}) = CeramicCar::extract(car)
.map_err(|err| DataFusionError::Internal(err.to_string()))?
{
cids.append_value(cid.to_bytes());
for prev in prevs {
multi_prevs.values().append_value(prev.to_bytes())
}
multi_prevs.append(true);
payloads.append_value(
serde_json::to_vec(&payload)
.map_err(|err| DataFusionError::Internal(err.to_string()))?,
previous,
data,
} = CeramicCar::extract(car)
.map_err(|err| exec_datafusion_err!("Error extracting event: {err}"))?;
stream_cids.append_value(stream_cid.to_bytes());
event_cids.append_value(cid.to_bytes());
if let Some(prev) = previous {
prevs.append_value(prev.to_bytes())
} else {
prevs.append_null()
};
if let Some(data) = data {
datas.append_value(
serde_json::to_string(&data)
.map_err(|err| exec_datafusion_err!("Error JSON encoding: {err}"))?,
);
} else {
cids.append_null();
multi_prevs.append(false);
payloads.append_null();
datas.append_null();
}
nulls.append(false);
} else {
Expand All @@ -110,9 +124,10 @@ impl ScalarUDFImpl for CeramicCar {
Ok(ColumnarValue::Array(Arc::new(StructArray::new(
self.return_fields.clone(),
vec![
Arc::new(cids.finish()),
Arc::new(multi_prevs.finish()),
Arc::new(payloads.finish()),
Arc::new(stream_cids.finish()),
Arc::new(event_cids.finish()),
Arc::new(prevs.finish()),
Arc::new(datas.finish()),
],
Some(nulls.finish().into()),
))))
Expand All @@ -121,7 +136,8 @@ impl ScalarUDFImpl for CeramicCar {

#[derive(Debug)]
struct Commit {
stream_cid: Cid,
cid: Cid,
prevs: Vec<Cid>,
payload: Value,
previous: Option<Cid>,
data: Option<Value>,
}
129 changes: 110 additions & 19 deletions olap/src/aggregator/ceramic_patch.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use std::{any::Any, sync::Arc};
use std::{collections::HashMap, sync::Arc};

use arrow::{
array::BinaryArray,
datatypes::{DataType, Field},
array::{Array as _, ArrayRef, StringBuilder},
datatypes::DataType,
};
use datafusion::{
common::{cast::as_binary_array, Result},
logical_expr::{ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility},
common::{
cast::{as_binary_array, as_string_array},
exec_datafusion_err, exec_err, Result,
},
logical_expr::{
PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF, WindowUDFImpl,
},
};

/// Applies a Ceramic data event to a document state returning the new document state.
Expand All @@ -16,38 +21,124 @@ pub struct CeramicPatch {
}

impl CeramicPatch {
pub fn new() -> Self {
pub fn new_udwf() -> WindowUDF {
WindowUDF::new_from_impl(Self::new())
}
fn new() -> Self {
Self {
signature: Signature::new(
TypeSignature::Exact(vec![
DataType::Binary,
DataType::List(Arc::new(Field::new_list_field(DataType::Binary, true))),
DataType::Binary,
DataType::Utf8,
DataType::Utf8,
]),
Volatility::Immutable,
),
}
}
}

impl ScalarUDFImpl for CeramicPatch {
fn as_any(&self) -> &dyn Any {
impl WindowUDFImpl for CeramicPatch {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn name(&self) -> &str {
"ceramic_patch"
}
fn signature(&self) -> &Signature {

fn signature(&self) -> &datafusion::logical_expr::Signature {
&self.signature
}
fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
Ok(DataType::Binary)

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
}

fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(CeramicPatchEvaluator))
}
}

#[derive(Debug)]
struct CeramicPatchEvaluator;

impl CeramicPatchEvaluator {
fn apply_patch(patch: &str, previous_state: &str) -> Result<String> {
let patch: Vec<json_patch::PatchOperation> = serde_json::from_str(patch)
.map_err(|err| exec_datafusion_err!("Error parsing patch: {err}"))?;
let mut new_state: serde_json::Value = serde_json::from_str(previous_state)
.map_err(|err| exec_datafusion_err!("Error parsing previous state: {err}"))?;
json_patch::patch(&mut new_state, &patch)
.map_err(|err| exec_datafusion_err!("Error applying JSON patch: {err}"))?;
serde_json::to_string(&new_state)
.map_err(|err| exec_datafusion_err!("Error JSON encoding: {err}"))
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let events = as_binary_array(&args[0])?;
let _states = as_binary_array(&args[0])?;
Ok(ColumnarValue::Array(Arc::new(
events.into_iter().collect::<BinaryArray>(),
)))
}

impl PartitionEvaluator for CeramicPatchEvaluator {
fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
let event_cids = as_binary_array(&values[0])?;
let previous_cids = as_binary_array(&values[1])?;
let previous_states = as_string_array(&values[2])?;
let patches = as_string_array(&values[3])?;
// TODO: avoid using String here
let mut new_states: HashMap<&[u8], String> = HashMap::new();
// Repeatedly iterate events until we have determined the new state of all events.
loop {
let mut progress = false;
for i in 0..num_rows {
let cid = event_cids.value(i);
if new_states.contains_key(cid) {
// We already determine this event's new state
continue;
}
if previous_cids.is_null(i) {
//Init event, patch value is the initial state
if patches.is_null(i) {
// If we have an init event without data use an empty object as the initial
// state. This feels like a leaky abstraction is this expected based on the
// Ceramic spec?
new_states.insert(cid, "{}".to_string());
} else {
new_states.insert(cid, patches.value(i).to_string());
}
progress = true;
} else if let Some(previous_state) = if !previous_states.is_null(i) {
Some(previous_states.value(i))
} else {
new_states.get(previous_cids.value(i)).map(String::as_str)
} {
let new_state = if patches.is_null(i) {
//We have a time event, new state is just the previous state
previous_state.to_string()
} else {
CeramicPatchEvaluator::apply_patch(patches.value(i), previous_state)?
};
new_states.insert(cid, new_state);
progress = true;
}
}
if !progress {
//TODO provide summary data about events that do not have a previous
return exec_err!("broken chain, missing events");
}
if new_states.len() == num_rows {
break;
}
}
// Construct arrow array from all new states
let size = new_states.values().map(String::len).sum();
let mut new_states_arr = StringBuilder::with_capacity(new_states.len(), size);
for i in 0..num_rows {
let cid = event_cids.value(i);
new_states_arr.append_value(
new_states
.get(&cid)
.ok_or_else(|| exec_datafusion_err!("invalid conclusion events found"))?,
);
}
Ok(Arc::new(new_states_arr.finish()))
}
}
Loading

0 comments on commit c709df4

Please sign in to comment.