Skip to content

Commit

Permalink
refactor: small logic change for readability (#534)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc authored Sep 18, 2024
1 parent 4cd3cd0 commit 669915d
Showing 1 changed file with 55 additions and 35 deletions.
90 changes: 55 additions & 35 deletions olap/src/aggregator/ceramic_patch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,68 @@ impl CeramicPatchEvaluator {
}

impl PartitionEvaluator for CeramicPatchEvaluator {
// Compute the new state of each document for a batch of events.
// Produces num_rows new document states, i.e. one for each input event.
//
// Assumption made by the function:
// * Window partitions are by stream_cid
// * Rows are ordered by the index column
//
// With these assumptions the code assumes it has all events for a stream and only events from
// a single stream.
// Additionally index sort order means that any event's previous event comes earlier in the
// data set and so a single pass algorithm can be implemented.
//
// Input data must have the following columns:
// * event_cid - unique id of the event
// * previous_cid - id of the previous event, nullable implies an init event.
// * previous_state - state of the previous event, nullable implies that the previous event
// exists in the current dataset.
// * patch - json patch to apply
fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
let event_cids = as_binary_array(&values[0])?;
let previous_cids = as_binary_array(&values[1])?;
let previous_states = as_string_array(&values[2])?;
let patches = as_string_array(&values[3])?;
let mut new_states = StringBuilder::new();
for i in 0..num_rows {
if previous_cids.is_null(i) {
if previous_cids.is_valid(i) {
if let Some(previous_state) = if !previous_states.is_null(i) {
// We know the previous state already
Some(previous_states.value(i))
} else {
// Iterator backwards till we find the previous state among the new states.
let previous_cid = previous_cids.value(i);
let mut j = i;
loop {
if j == 0 {
break None;
}
j -= 1;
if event_cids.value(j) == previous_cid {
break Some(value_at(&new_states, j));
}
}
} {
if patches.is_null(i) {
// We have a time event, new state is just the previous state
//
// Allow clippy warning as previous_state is a reference back into new_states.
// So we need to copy the data to a new location before we can copy it back
// into the new_states.
#[allow(clippy::unnecessary_to_owned)]
new_states.append_value(previous_state.to_string());
} else {
new_states.append_value(CeramicPatchEvaluator::apply_patch(
patches.value(i),
previous_state,
)?);
}
} else {
// Unreachable when data is well formed.
new_states.append_null();
}
} else {
//Init event, patch value is the initial state
if patches.is_null(i) {
// If we have an init event without data use an empty object as the initial
Expand All @@ -95,40 +149,6 @@ impl PartitionEvaluator for CeramicPatchEvaluator {
} else {
new_states.append_value(patches.value(i));
}
} else if let Some(previous_state) = if !previous_states.is_null(i) {
// We know the previous state already
Some(previous_states.value(i))
} else {
// Iterator backwards till we find the previous state among the new states.
let previous_cid = previous_cids.value(i);
let mut j = i;
loop {
if j == 0 {
break None;
}
j -= 1;
if event_cids.value(j) == previous_cid {
break Some(value_at(&new_states, j));
}
}
} {
if patches.is_null(i) {
// We have a time event, new state is just the previous state
//
// Allow clippy warning as previous_state is a reference back into new_states.
// So we need to copy the data to a new location before we can copy it back
// into the new_states.
#[allow(clippy::unnecessary_to_owned)]
new_states.append_value(previous_state.to_string());
} else {
new_states.append_value(CeramicPatchEvaluator::apply_patch(
patches.value(i),
previous_state,
)?);
}
} else {
// Unreachable when data is well formed.
new_states.append_null();
}
}
Ok(Arc::new(new_states.finish()))
Expand Down

0 comments on commit 669915d

Please sign in to comment.