Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use correct index for the conclusion feed. #561

Merged
merged 6 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions event-svc/benches/sqlite_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ceramic_event::unvalidated::{
signed::{self, Signer},
Builder,
};
use ceramic_event_svc::store::{CeramicOneEvent, EventInsertable};
use ceramic_event_svc::store::{EventAccess, EventInsertable};
use ceramic_sql::sqlite::SqlitePool;
use criterion2::{criterion_group, criterion_main, BatchSize, Criterion};
use ipld_core::ipld::Ipld;
Expand Down Expand Up @@ -91,7 +91,7 @@ async fn model_routine(input: ModelSetup) {
let futs = futs.into_iter().map(|batch| {
let store = input.pool.clone();
let set = batch.into_iter().collect::<Vec<_>>();
async move { CeramicOneEvent::insert_many(&store, set.iter()).await }
async move { EventAccess::insert_many(&store, set.iter()).await }
});
futures::future::join_all(futs).await;
}
Expand Down
4 changes: 3 additions & 1 deletion event-svc/src/event/feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ impl ConclusionFeed for EventService {
limit: i64,
) -> anyhow::Result<Vec<ConclusionEvent>> {
let raw_events = self
.fetch_events_since_highwater_mark(highwater_mark, limit)
// TODO: Can we make highwater_marks zero based?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the correct behavior? If we want to fetch events since a highwatermark, we would get events since highwater_mark + 1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes without this change you always get back the last row from the previous batch as the first row in the next batch.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look in the code for how the highwater mark is made you can see it already is adding one to it. This is what I mean by saying they are one based. Hence a TODO for later to investigate if we can simplify things to make everything zero based.

// highwater marks are 1 based, add one
.fetch_events_since_highwater_mark(highwater_mark + 1, limit)
.await?;

let conclusion_events_futures = raw_events
Expand Down
37 changes: 20 additions & 17 deletions event-svc/src/event/order_events.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;

use crate::store::{CeramicOneEvent, EventInsertable};
use crate::store::{EventAccess, EventInsertable};
use crate::Result;
use ceramic_core::Cid;
use ceramic_sql::sqlite::SqlitePool;

/// Groups the events into lists of those with a delivered prev and those without. This can be used to return an error if the event is required to have history.
/// The events will be marked as deliverable so that they can be passed directly to the store to be persisted. It assumes init events have already been marked deliverable.
Expand Down Expand Up @@ -41,15 +41,15 @@ impl OrderEvents {

/// Uses the in memory set and the database to try to follow prev chains and mark deliverable
pub async fn find_currently_deliverable(
pool: &SqlitePool,
event_access: Arc<EventAccess>,
candidate_events: Vec<EventInsertable>,
) -> Result<Self> {
Self::find_deliverable_internal(Some(pool), candidate_events).await
Self::find_deliverable_internal(Some(event_access), candidate_events).await
}

/// Builds deliverable events, using the db pool if provided
async fn find_deliverable_internal(
pool: Option<&SqlitePool>,
event_access: Option<Arc<EventAccess>>,
candidate_events: Vec<EventInsertable>,
) -> Result<Self> {
let mut new_cids: HashMap<Cid, bool> = HashMap::with_capacity(candidate_events.len());
Expand Down Expand Up @@ -88,9 +88,9 @@ impl OrderEvents {
} else {
undelivered_prevs_in_memory.push_back(event);
}
} else if let Some(pool) = pool {
} else if let Some(event_access) = &event_access {
let (_exists, prev_deliverable) =
CeramicOneEvent::deliverable_by_cid(pool, prev).await?;
event_access.deliverable_by_cid(prev).await?;
if prev_deliverable {
event.set_deliverable(true);
*new_cids.get_mut(event.cid()).expect("CID must exist") = true;
Expand Down Expand Up @@ -145,7 +145,10 @@ impl OrderEvents {

#[cfg(test)]
mod test {
use std::sync::Arc;

use ceramic_core::EventId;
use ceramic_sql::sqlite::SqlitePool;
use rand::seq::SliceRandom;
use rand::thread_rng;
use recon::ReconItem;
Expand Down Expand Up @@ -225,11 +228,12 @@ mod test {
#[test(tokio::test)]
async fn out_of_order_streams_valid() {
let pool = SqlitePool::connect_in_memory().await.unwrap();
let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap());

let (stream_1, stream_2, mut to_insert) = get_2_streams().await;
to_insert.shuffle(&mut thread_rng());

let ordered = OrderEvents::find_currently_deliverable(&pool, to_insert)
let ordered = OrderEvents::find_currently_deliverable(event_access, to_insert)
.await
.unwrap();
assert!(
Expand All @@ -254,13 +258,14 @@ mod test {
#[test(tokio::test)]
async fn missing_history_in_memory() {
let pool = SqlitePool::connect_in_memory().await.unwrap();
let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap());

let (stream_1, stream_2, mut to_insert) = get_2_streams().await;
// if event 2 is missing from stream_1, we will sort stream_2 but stream_1 will be "missing history" after the init event
to_insert.remove(1);
to_insert.shuffle(&mut thread_rng());

let ordered = OrderEvents::find_currently_deliverable(&pool, to_insert)
let ordered = OrderEvents::find_currently_deliverable(event_access, to_insert)
.await
.unwrap();
assert_eq!(
Expand All @@ -285,16 +290,15 @@ mod test {
// so that an API write that had never seen event 2, would not able to write event 3 or after
// the recon ordering task would sort this and mark all deliverable
let pool = SqlitePool::connect_in_memory().await.unwrap();
let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap());

let stream_1 = get_n_events(10).await;
let (to_insert, mut remaining) = get_insertable_events(&stream_1, 3).await;
CeramicOneEvent::insert_many(&pool, to_insert.iter())
.await
.unwrap();
event_access.insert_many(to_insert.iter()).await.unwrap();

remaining.shuffle(&mut thread_rng());

let ordered = OrderEvents::find_currently_deliverable(&pool, remaining)
let ordered = OrderEvents::find_currently_deliverable(event_access, remaining)
.await
.unwrap();
assert_eq!(
Expand All @@ -310,24 +314,23 @@ mod test {
// this test validates we can order in memory events with each other if one of them has a prev
// in the database that is deliverable, in which case the entire chain is deliverable
let pool = SqlitePool::connect_in_memory().await.unwrap();
let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap());

let stream_1 = get_n_events(10).await;
let (mut to_insert, mut remaining) = get_insertable_events(&stream_1, 3).await;
for item in to_insert.as_mut_slice() {
item.set_deliverable(true)
}

CeramicOneEvent::insert_many(&pool, to_insert.iter())
.await
.unwrap();
event_access.insert_many(to_insert.iter()).await.unwrap();

let expected = remaining
.iter()
.map(|i| i.order_key().clone())
.collect::<Vec<_>>();
remaining.shuffle(&mut thread_rng());

let ordered = OrderEvents::find_currently_deliverable(&pool, remaining)
let ordered = OrderEvents::find_currently_deliverable(event_access, remaining)
.await
.unwrap();
assert!(
Expand Down
Loading
Loading