Skip to content

Commit

Permalink
fix: use single pass inside ceramic_patch
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Sep 16, 2024
1 parent 82af554 commit 34c8bce
Showing 1 changed file with 57 additions and 54 deletions.
111 changes: 57 additions & 54 deletions olap/src/aggregator/ceramic_patch.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use arrow::{
array::{Array as _, ArrayRef, StringBuilder},
array::{Array as _, ArrayBuilder as _, ArrayRef, StringBuilder},
datatypes::DataType,
};
use datafusion::{
common::{
cast::{as_binary_array, as_string_array},
exec_datafusion_err, exec_err, Result,
exec_datafusion_err, Result,
},
logical_expr::{
PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF, WindowUDFImpl,
Expand Down Expand Up @@ -83,62 +83,65 @@ impl PartitionEvaluator for CeramicPatchEvaluator {
let previous_cids = as_binary_array(&values[1])?;
let previous_states = as_string_array(&values[2])?;
let patches = as_string_array(&values[3])?;
// TODO: avoid using String here
let mut new_states: HashMap<&[u8], String> = HashMap::new();
// Repeatedly iterate events until we have determined the new state of all events.
loop {
let mut progress = false;
for i in 0..num_rows {
let cid = event_cids.value(i);
if new_states.contains_key(cid) {
// We already determine this event's new state
continue;
let mut new_states = StringBuilder::new();
for i in 0..num_rows {
if previous_cids.is_null(i) {
//Init event, patch value is the initial state
if patches.is_null(i) {
// If we have an init event without data use an empty object as the initial
// state. This feels like a leaky abstraction is this expected based on the
// Ceramic spec?
new_states.append_value("{}");
} else {
new_states.append_value(patches.value(i));
}
if previous_cids.is_null(i) {
//Init event, patch value is the initial state
if patches.is_null(i) {
// If we have an init event without data use an empty object as the initial
// state. This feels like a leaky abstraction is this expected based on the
// Ceramic spec?
new_states.insert(cid, "{}".to_string());
} else {
new_states.insert(cid, patches.value(i).to_string());
} else if let Some(previous_state) = if !previous_states.is_null(i) {
// We know the previous state already
Some(previous_states.value(i))
} else {
// Iterator backwards till we find the previous state among the new states.
let previous_cid = previous_cids.value(i);
let mut j = i;
loop {
if j == 0 {
break None;
}
j -= 1;
if event_cids.value(j) == previous_cid {
break Some(value_at(&new_states, j));
}
progress = true;
} else if let Some(previous_state) = if !previous_states.is_null(i) {
Some(previous_states.value(i))
}
} {
if patches.is_null(i) {
// We have a time event, new state is just the previous state
//
// Allow clippy warning as previous_state is a reference back into new_states.
// So we need to copy the data to a new location before we can copy it back
// into the new_states.
#[allow(clippy::unnecessary_to_owned)]
new_states.append_value(previous_state.to_string());
} else {
new_states.get(previous_cids.value(i)).map(String::as_str)
} {
let new_state = if patches.is_null(i) {
//We have a time event, new state is just the previous state
previous_state.to_string()
} else {
CeramicPatchEvaluator::apply_patch(patches.value(i), previous_state)?
};
new_states.insert(cid, new_state);
progress = true;
new_states.append_value(CeramicPatchEvaluator::apply_patch(
patches.value(i),
previous_state,
)?);
}
} else {
// Unreachable when data is well formed.
new_states.append_null();
}
if !progress {
//TODO provide summary data about events that do not have a previous
return exec_err!("broken chain, missing events");
}
if new_states.len() == num_rows {
break;
}
}
// Construct arrow array from all new states
let size = new_states.values().map(String::len).sum();
let mut new_states_arr = StringBuilder::with_capacity(new_states.len(), size);
for i in 0..num_rows {
let cid = event_cids.value(i);
new_states_arr.append_value(
new_states
.get(&cid)
.ok_or_else(|| exec_datafusion_err!("invalid conclusion events found"))?,
);
}
Ok(Arc::new(new_states_arr.finish()))
Ok(Arc::new(new_states.finish()))
}
}

fn value_at(builder: &StringBuilder, idx: usize) -> &str {
let start = builder.offsets_slice()[idx] as usize;
let stop = if idx < builder.len() {
builder.offsets_slice()[idx + 1] as usize
} else {
builder.values_slice().len()
};
std::str::from_utf8(&builder.values_slice()[start..stop])
.expect("new states should always be valid utf8")
}

0 comments on commit 34c8bce

Please sign in to comment.