-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: aes 291 continuous queries in olap aggregator #538
feat: aes 291 continuous queries in olap aggregator #538
Conversation
olap/src/aggregator/mod.rs
Outdated
/// Represents a processor for continuous stream processing of conclusion feed data. | ||
struct ContinuousStreamProcessor { | ||
ctx: SessionContext, | ||
last_processed_index: Cell<u64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is last_processed_index inside a Cell? Why not accept a &mut self in the process_batch method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not want ctx to be mutable on the struct. Taking &self can make the API more flexible. I think interior mutability can make the API more flexible. Example :
let mut processor = ContinuousStreamProcessor::new();
// This works fine
processor.process_batch(&[Data::new()]);
// But what if we want to do this?
let processors = vec![&processor, &processor];
for p in &processors {
p.process_batch(&[Data::new()]); // This won't compile
}```
Don't feel too strongly either ways.
olap/src/aggregator/mod.rs
Outdated
debug!("Received shutdown signal, stopping continuous stream processing"); | ||
break; | ||
} | ||
result = processor.process_batch() => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
process_batch only ever returns true. Should we simplify this logic such that it returns Result<()>?
olap/src/aggregator/mod.rs
Outdated
|
||
let df = batch.cache().await?; | ||
let df_clone = df.clone(); | ||
process_feed_batch(self.ctx.clone(), df_clone).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, since the name df_clone
doesn't convey any meaning I'd say this should just be process_feed_batch(self.ctx.clone(), df.clone()).await?;
olap/src/aggregator/mod.rs
Outdated
.filter(col("index").gt(lit(self.last_processed_index.get())))? | ||
.limit(0, Some(100))?; | ||
|
||
let df = batch.cache().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment here that we need a cache because we do two passes over the data would be good. First pass is the normal process_feed_batch pass while the second pass is to get the highest index
olap/src/aggregator/mod.rs
Outdated
#[test(tokio::test)] | ||
// TODO : Create a test that checks how many times quereies run. They should run continuously in a loop. | ||
// #[tokio::test] | ||
// async fn test_continuous_stream_processing() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any luck on getting a test working with the custom context?
The OLAP aggregator will continuously run the data event and time event queries and store the results into Basin. This code here adds logic to run the queries continuously.
We opted not to utilize Data Fusion's streaming capabilities, as they lack the features and flexibility required for our use case. Data Fusion's streaming functionality is relatively limited in scope.