-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add olap aggregation function #527
Conversation
d30a9fa
to
c876162
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few questions about the evaluate_all code.
I believe using topological sort would be a neater solution than the current iterative + hashmap approach.
Also concerned about termination during cycles in the current code. We might need to handle that and a return exec error
@samika98 I have updated the algo to do a single pass. |
c0af2c6
to
3ffd303
Compare
The aggregation function compute the stream state for each new event into a stream. All stream states are persisted into a parquet database.
75c98de
to
955fc46
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the test. Some questions as it's a bit dense to those of us uninitiated with datafusion (their docs seem really good though so I just need to spend some time reading them).
} | ||
|
||
impl PartitionEvaluator for CeramicPatchEvaluator { | ||
fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main thing I struggled with was understanding the where things are coming from and what we're actually working with. The datafusion docs are pretty good for how this trait is used but but including a doc comment about what the expected input looks like would be helpful. Then again, the assignments into the fields makes it fairly clear, so maybe it's not useful 🤷
(these assignments)
let event_cids = as_binary_array(&values[0])?;
let previous_cids = as_binary_array(&values[1])?;
let previous_states = as_string_array(&values[2])?;
let patches = as_string_array(&values[3])?;
samika is OOO and we discussed changes already in person
The aggregation function compute the stream state for each new event into a stream. All stream states are persisted into a parquet database.
TODO:
Closes AES-289
Closes AES-290