-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: use correct index for the conclusion feed. #561
Conversation
We found a few bugs in the conclusion feed implementation on the event service. Fixes will follow, however this change updates the test to make the bugs obvious. Bugs: 1. Indexes are duplicated 2. Unsigned event indexes are always zero 3. Off by one bug in the bounds of the highwater mark
The previous code would zip the parsed events with the all_blocks iterator. However the all_blocks iterator was longer than parsed and so the delivered values got misaligned with the events, causing the index of conclusion feed events to be incorrect. Now we correctly filter down the all_block iterator so it has only a single value per event.
The previous code had a typo to always use 0 as the delivered/index for unsigned init events. This is now fixed.
The sqlite db uses 1 based highwater_mark values, meaning it does delivered >= highwater_mark vs delivered > highwater_mark. The conclusion feed doesn't treat the highwater mark directly but instead just returns the index and expects clients to reuse the max index they have seen which means we want an exclusive comparison. We achieve this by adding one to the highwater_mark.
@@ -403,7 +403,7 @@ impl EventService { | |||
.map_err(|e| { | |||
Error::new_app(anyhow::anyhow!("Failed to serialize IPLD data: {}", e)) | |||
})?, | |||
index: 0, | |||
index: delivered as u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests should have caught this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No test included an unsigned init event. That is why I added it to the generated data.
Previously a global static delivered counter was used to assign/track delivered values. However this makes testing hard as if multiple tests run they effect the delivered values of each other. This change makes the EventAccess contain the counter state within itself and then anywhere that a pool was used to access events it was replaced with an Arc<EventAccess>. This means we can truly have isolated delivered counters.
We have an access type pattern for structs that contain the logic to read specific tables in the db. There were called CeramicOne{Table}, with this change they are renamed to {Table}Access. The CeramicOne prefix was meaningless and the table name alone was not enough to distinguish the type from other types related to the same entities.
@@ -370,17 +371,30 @@ impl CeramicOneEvent { | |||
all_blocks.iter().map(|row| row.block.clone()).collect(), | |||
) | |||
.await?; | |||
|
|||
// We need to match up the delivered index with each event. However all_blocks contains an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are keeping multiple copies of data in memory, can we move this logic in the SQL query itself?
let max_highwater = sqlx::query_as::<_, DeliveredEventBlockRow>(
r#"
SELECT
event_id,
MAX(delivered) AS max_delivered,
block_data
FROM
events_table
WHERE
delivered > ? -- Only fetch events with a delivered value greater than the highwater mark
GROUP BY
event_id
ORDER BY
max_delivered ASC
LIMIT ?
"#,
)
```
What do you think? Is this a better approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we can call,
.bind(highwater)
.bind(limit)
.fetch_all(pool.reader())
.await?
.into_iter()
.map(|row| {
// Decode the CAR file from the block data
....
// Add the parsed event to the list
....
// Update the max highwater mark
})
.max()
.unwrap_or(highwater); // Use the input highwater if no new events are found ```
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The challenge is that we need events, but we have to construct the events from the list of blocks that make up each event. So the all_block
query needs to join with the blocks tables in order to get all the data. This PR doesn't change how much memory is needed (we were previously keeping a copy of the data in memory). I think optimizing this query should be future work.(If we find its a cause of slow down)
@@ -33,7 +33,9 @@ impl ConclusionFeed for EventService { | |||
limit: i64, | |||
) -> anyhow::Result<Vec<ConclusionEvent>> { | |||
let raw_events = self | |||
.fetch_events_since_highwater_mark(highwater_mark, limit) | |||
// TODO: Can we make highwater_marks zero based? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the correct behavior? If we want to fetch events since a highwatermark, we would get events since highwater_mark + 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes without this change you always get back the last row from the previous batch as the first row in the next batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look in the code for how the highwater mark is made you can see it already is adding one to it. This is what I mean by saying they are one based. Hence a TODO for later to investigate if we can simplify things to make everything zero based.
Thank you for the detailed commit messages and explainations! Reviewing this code commit by commit was fun |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
We found a few bugs in the conclusion feed implementation on the event
service. Fixes will follow, however this change updates the test to make
the bugs obvious.
Bugs:
The first commit in this change updates the test to expose all three bugs. The following commit each fix one of the bugs.
Turns out the delivered insertion was not test friendly. I added two more refactor commits that make the delivered counter isolated per EventService and another to rename some types whose names had become less clear because of other refactors.