-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: use correct index for the conclusion feed. #561
Changes from 4 commits
c131d41
ef9dc4a
7c34d06
fc4759c
bc03421
ed542f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -403,7 +403,7 @@ impl EventService { | |
.map_err(|e| { | ||
Error::new_app(anyhow::anyhow!("Failed to serialize IPLD data: {}", e)) | ||
})?, | ||
index: 0, | ||
index: delivered as u64, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. !! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tests should have caught this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No test included an unsigned init event. That is why I added it to the generated data. |
||
})) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ use ceramic_core::{event_id::InvalidEventId, Cid, EventId, NodeId}; | |
use ceramic_event::unvalidated; | ||
use ceramic_sql::sqlite::{SqlitePool, SqliteTransaction}; | ||
use ipld_core::ipld::Ipld; | ||
use itertools::Itertools; | ||
use recon::{AssociativeHash, HashCount, Key, Sha256a}; | ||
|
||
use crate::store::{ | ||
|
@@ -370,17 +371,30 @@ impl CeramicOneEvent { | |
all_blocks.iter().map(|row| row.block.clone()).collect(), | ||
) | ||
.await?; | ||
|
||
// We need to match up the delivered index with each event. However all_blocks contains an | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are keeping multiple copies of data in memory, can we move this logic in the SQL query itself?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then we can call,
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The challenge is that we need events, but we have to construct the events from the list of blocks that make up each event. So the |
||
// item for each block within each event. We need to chunk all_blocks by event and then | ||
// find the max delivered for each event. This will create an iterator of a single | ||
// delivered value for each event. With that iterator we can zip it with the parsed block | ||
// car files as there is a 1:1 mapping. | ||
let event_chunks = all_blocks | ||
.into_iter() | ||
.chunk_by(|block| block.block.order_key.clone()); | ||
let delivered_iter = event_chunks | ||
.into_iter() | ||
.map(|(_, event_chunk)| event_chunk.map(|block| block.delivered).max()); | ||
|
||
let result: Result<Vec<EventRowDelivered>> = parsed | ||
.into_iter() | ||
.zip(all_blocks.iter()) | ||
.map(|((_, carfile), block)| { | ||
.zip(delivered_iter) | ||
.map(|((_, carfile), delivered)| { | ||
let (cid, event) = | ||
unvalidated::Event::<Ipld>::decode_car(carfile.as_slice(), false) | ||
.map_err(|_| Error::new_fatal(anyhow!("Error parsing event row")))?; | ||
Ok(EventRowDelivered { | ||
cid, | ||
event, | ||
delivered: block.delivered, | ||
delivered: delivered.expect("should always be one block per event"), | ||
}) | ||
}) | ||
.collect(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the correct behavior? If we want to fetch events since a highwatermark, we would get events since highwater_mark + 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes without this change you always get back the last row from the previous batch as the first row in the next batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look in the code for how the highwater mark is made you can see it already is adding one to it. This is what I mean by saying they are one based. Hence a TODO for later to investigate if we can simplify things to make everything zero based.