diff --git a/event-svc/benches/sqlite_store.rs b/event-svc/benches/sqlite_store.rs index 97245128..15ae673f 100644 --- a/event-svc/benches/sqlite_store.rs +++ b/event-svc/benches/sqlite_store.rs @@ -6,7 +6,7 @@ use ceramic_event::unvalidated::{ signed::{self, Signer}, Builder, }; -use ceramic_event_svc::store::{CeramicOneEvent, EventInsertable}; +use ceramic_event_svc::store::{EventAccess, EventInsertable}; use ceramic_sql::sqlite::SqlitePool; use criterion2::{criterion_group, criterion_main, BatchSize, Criterion}; use ipld_core::ipld::Ipld; @@ -91,7 +91,7 @@ async fn model_routine(input: ModelSetup) { let futs = futs.into_iter().map(|batch| { let store = input.pool.clone(); let set = batch.into_iter().collect::>(); - async move { CeramicOneEvent::insert_many(&store, set.iter()).await } + async move { EventAccess::insert_many(&store, set.iter()).await } }); futures::future::join_all(futs).await; } diff --git a/event-svc/src/event/feed.rs b/event-svc/src/event/feed.rs index 062b8da3..af9326b3 100644 --- a/event-svc/src/event/feed.rs +++ b/event-svc/src/event/feed.rs @@ -33,7 +33,9 @@ impl ConclusionFeed for EventService { limit: i64, ) -> anyhow::Result> { let raw_events = self - .fetch_events_since_highwater_mark(highwater_mark, limit) + // TODO: Can we make highwater_marks zero based? + // highwater marks are 1 based, add one + .fetch_events_since_highwater_mark(highwater_mark + 1, limit) .await?; let conclusion_events_futures = raw_events diff --git a/event-svc/src/event/order_events.rs b/event-svc/src/event/order_events.rs index ba0489cc..e947a4f1 100644 --- a/event-svc/src/event/order_events.rs +++ b/event-svc/src/event/order_events.rs @@ -1,9 +1,9 @@ use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; -use crate::store::{CeramicOneEvent, EventInsertable}; +use crate::store::{EventAccess, EventInsertable}; use crate::Result; use ceramic_core::Cid; -use ceramic_sql::sqlite::SqlitePool; /// Groups the events into lists of those with a delivered prev and those without. This can be used to return an error if the event is required to have history. /// The events will be marked as deliverable so that they can be passed directly to the store to be persisted. It assumes init events have already been marked deliverable. @@ -41,15 +41,15 @@ impl OrderEvents { /// Uses the in memory set and the database to try to follow prev chains and mark deliverable pub async fn find_currently_deliverable( - pool: &SqlitePool, + event_access: Arc, candidate_events: Vec, ) -> Result { - Self::find_deliverable_internal(Some(pool), candidate_events).await + Self::find_deliverable_internal(Some(event_access), candidate_events).await } /// Builds deliverable events, using the db pool if provided async fn find_deliverable_internal( - pool: Option<&SqlitePool>, + event_access: Option>, candidate_events: Vec, ) -> Result { let mut new_cids: HashMap = HashMap::with_capacity(candidate_events.len()); @@ -88,9 +88,9 @@ impl OrderEvents { } else { undelivered_prevs_in_memory.push_back(event); } - } else if let Some(pool) = pool { + } else if let Some(event_access) = &event_access { let (_exists, prev_deliverable) = - CeramicOneEvent::deliverable_by_cid(pool, prev).await?; + event_access.deliverable_by_cid(prev).await?; if prev_deliverable { event.set_deliverable(true); *new_cids.get_mut(event.cid()).expect("CID must exist") = true; @@ -145,7 +145,10 @@ impl OrderEvents { #[cfg(test)] mod test { + use std::sync::Arc; + use ceramic_core::EventId; + use ceramic_sql::sqlite::SqlitePool; use rand::seq::SliceRandom; use rand::thread_rng; use recon::ReconItem; @@ -225,11 +228,12 @@ mod test { #[test(tokio::test)] async fn out_of_order_streams_valid() { let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); let (stream_1, stream_2, mut to_insert) = get_2_streams().await; to_insert.shuffle(&mut thread_rng()); - let ordered = OrderEvents::find_currently_deliverable(&pool, to_insert) + let ordered = OrderEvents::find_currently_deliverable(event_access, to_insert) .await .unwrap(); assert!( @@ -254,13 +258,14 @@ mod test { #[test(tokio::test)] async fn missing_history_in_memory() { let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); let (stream_1, stream_2, mut to_insert) = get_2_streams().await; // if event 2 is missing from stream_1, we will sort stream_2 but stream_1 will be "missing history" after the init event to_insert.remove(1); to_insert.shuffle(&mut thread_rng()); - let ordered = OrderEvents::find_currently_deliverable(&pool, to_insert) + let ordered = OrderEvents::find_currently_deliverable(event_access, to_insert) .await .unwrap(); assert_eq!( @@ -285,16 +290,15 @@ mod test { // so that an API write that had never seen event 2, would not able to write event 3 or after // the recon ordering task would sort this and mark all deliverable let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); let stream_1 = get_n_events(10).await; let (to_insert, mut remaining) = get_insertable_events(&stream_1, 3).await; - CeramicOneEvent::insert_many(&pool, to_insert.iter()) - .await - .unwrap(); + event_access.insert_many(to_insert.iter()).await.unwrap(); remaining.shuffle(&mut thread_rng()); - let ordered = OrderEvents::find_currently_deliverable(&pool, remaining) + let ordered = OrderEvents::find_currently_deliverable(event_access, remaining) .await .unwrap(); assert_eq!( @@ -310,6 +314,7 @@ mod test { // this test validates we can order in memory events with each other if one of them has a prev // in the database that is deliverable, in which case the entire chain is deliverable let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); let stream_1 = get_n_events(10).await; let (mut to_insert, mut remaining) = get_insertable_events(&stream_1, 3).await; @@ -317,9 +322,7 @@ mod test { item.set_deliverable(true) } - CeramicOneEvent::insert_many(&pool, to_insert.iter()) - .await - .unwrap(); + event_access.insert_many(to_insert.iter()).await.unwrap(); let expected = remaining .iter() @@ -327,7 +330,7 @@ mod test { .collect::>(); remaining.shuffle(&mut thread_rng()); - let ordered = OrderEvents::find_currently_deliverable(&pool, remaining) + let ordered = OrderEvents::find_currently_deliverable(event_access, remaining) .await .unwrap(); assert!( diff --git a/event-svc/src/event/ordering_task.rs b/event-svc/src/event/ordering_task.rs index ae2903ab..bce9dbdb 100644 --- a/event-svc/src/event/ordering_task.rs +++ b/event-svc/src/event/ordering_task.rs @@ -1,13 +1,13 @@ use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; use anyhow::anyhow; use ceramic_event::unvalidated; -use ceramic_sql::sqlite::SqlitePool; use cid::Cid; use ipld_core::ipld::Ipld; use tracing::{debug, error, info, trace, warn}; -use crate::store::CeramicOneEvent; +use crate::store::EventAccess; use crate::{Error, Result}; use super::service::DiscoveredEvent; @@ -32,18 +32,19 @@ impl OrderingTask { /// Discover all undelivered events in the database and mark them deliverable if possible. /// Returns the number of events marked deliverable. pub async fn process_all_undelivered_events( - pool: &SqlitePool, + event_access: Arc, max_iterations: usize, batch_size: u32, ) -> Result { - OrderingState::process_all_undelivered_events(pool, max_iterations, batch_size).await + OrderingState::process_all_undelivered_events(event_access, max_iterations, batch_size) + .await } /// Spawn a task to run the ordering task background process in a loop - pub async fn run(pool: SqlitePool, q_depth: usize) -> DeliverableTask { + pub async fn run(event_access: Arc, q_depth: usize) -> DeliverableTask { let (tx_inserted, rx_inserted) = tokio::sync::mpsc::channel::(q_depth); - let handle = tokio::spawn(async move { Self::run_loop(pool, rx_inserted).await }); + let handle = tokio::spawn(async move { Self::run_loop(event_access, rx_inserted).await }); DeliverableTask { _handle: handle, @@ -52,7 +53,7 @@ impl OrderingTask { } async fn run_loop( - pool: SqlitePool, + event_access: Arc, mut rx_inserted: tokio::sync::mpsc::Receiver, ) { let mut state = OrderingState::new(); @@ -66,7 +67,7 @@ impl OrderingTask { state.add_inserted_events(recon_events); if state - .process_streams(&pool) + .process_streams(Arc::clone(&event_access)) .await .map_err(Self::log_error) .is_err() @@ -76,7 +77,10 @@ impl OrderingTask { } } - let _ = state.process_streams(&pool).await.map_err(Self::log_error); + let _ = state + .process_streams(event_access) + .await + .map_err(Self::log_error); } /// Log an error and return a result that can be used to stop the task if it was fatal @@ -116,18 +120,16 @@ impl StreamEvent { } /// Builds a stream event from the database if it exists. - async fn load_by_cid(pool: &SqlitePool, cid: EventCid) -> Result> { + async fn load_by_cid(event_access: Arc, cid: EventCid) -> Result> { // TODO: Condense the multiple DB queries happening here into a single query - let (exists, deliverable) = CeramicOneEvent::deliverable_by_cid(pool, &cid).await?; + let (exists, deliverable) = event_access.deliverable_by_cid(&cid).await?; if exists { - let data = CeramicOneEvent::value_by_cid(pool, &cid) - .await? - .ok_or_else(|| { - Error::new_app(anyhow!( - "Missing event data for event that must exist: CID={}", - cid - )) - })?; + let data = event_access.value_by_cid(&cid).await?.ok_or_else(|| { + Error::new_app(anyhow!( + "Missing event data for event that must exist: CID={}", + cid + )) + })?; let (_cid, parsed) = unvalidated::Event::::decode_car(data.as_slice(), false) .map_err(Error::new_app)?; @@ -326,7 +328,7 @@ impl StreamEvents { } } - async fn order_events(&mut self, pool: &SqlitePool) -> Result<()> { + async fn order_events(&mut self, event_access: Arc) -> Result<()> { // We collect everything we can into memory and then order things. // If our prev is deliverable then we can mark ourselves as deliverable. If our prev wasn't deliverable yet, // we track it and repeat (i.e. add it to our state and the set we're iterating to attempt to load its prev). @@ -366,7 +368,7 @@ impl StreamEvents { // nothing to do until it arrives on the channel } } else if let Some(discovered_prev) = - StreamEvent::load_by_cid(pool, desired_prev).await? + StreamEvent::load_by_cid(Arc::clone(&event_access), desired_prev).await? { match &discovered_prev { // we found our prev in the database and it's deliverable, so we're deliverable now @@ -456,16 +458,18 @@ impl OrderingState { /// Process every stream we know about that has undelivered events that should be "unlocked" now. This could be adjusted to commit things in batches, /// but for now it assumes it can process all the streams and events in one go. It should be idempotent, so if it fails, it can be retried. - async fn process_streams(&mut self, pool: &SqlitePool) -> Result<()> { + async fn process_streams(&mut self, event_access: Arc) -> Result<()> { for (_stream_cid, stream_events) in self.pending_by_stream.iter_mut() { if stream_events.should_process { - stream_events.order_events(pool).await?; + stream_events + .order_events(Arc::clone(&event_access)) + .await?; self.deliverable .extend(stream_events.new_deliverable.iter()); } } - match self.persist_ready_events(pool).await { + match self.persist_ready_events(Arc::clone(&event_access)).await { Ok(_) => {} Err(err) => { // Clear the queue as we'll rediscover it on the next run, rather than try to double update everything. @@ -488,7 +492,7 @@ impl OrderingState { /// Process all undelivered events in the database. This is a blocking operation that could take a long time. /// It is intended to be run at startup but could be used on an interval or after some errors to recover. pub(crate) async fn process_all_undelivered_events( - pool: &SqlitePool, + event_access: Arc, max_iterations: usize, batch_size: u32, ) -> Result { @@ -499,9 +503,9 @@ impl OrderingState { let mut highwater = 0; while iter_cnt < max_iterations { iter_cnt += 1; - let (undelivered, new_hw) = - CeramicOneEvent::undelivered_with_values(pool, highwater, batch_size.into()) - .await?; + let (undelivered, new_hw) = event_access + .undelivered_with_values(highwater, batch_size.into()) + .await?; highwater = new_hw; let found_something = !undelivered.is_empty(); let found_everything = undelivered.len() < batch_size as usize; @@ -512,7 +516,7 @@ impl OrderingState { // In this case, we won't discover them until we start running recon with a peer, so maybe we should drop them // or otherwise mark them ignored somehow. When this function ends, we do drop everything so for now it's probably okay. let number_processed = state - .process_undelivered_events_batch(pool, undelivered) + .process_undelivered_events_batch(Arc::clone(&event_access), undelivered) .await?; event_cnt += number_processed; if event_cnt % LOG_EVERY_N_ENTRIES < number_processed { @@ -532,7 +536,7 @@ impl OrderingState { async fn process_undelivered_events_batch( &mut self, - pool: &SqlitePool, + event_access: Arc, event_data: Vec<(Cid, unvalidated::Event)>, ) -> Result { trace!(cnt=%event_data.len(), "Processing undelivered events batch"); @@ -565,27 +569,27 @@ impl OrderingState { "Found init events in undelivered batch. This should never happen.", ); debug_assert!(false); - let mut tx = pool.begin_tx().await?; + let mut tx = event_access.begin_tx().await?; for cid in discovered_inits { - CeramicOneEvent::mark_ready_to_deliver(&mut tx, &cid).await?; + event_access.mark_ready_to_deliver(&mut tx, &cid).await?; } tx.commit().await?; } - self.process_streams(pool).await?; + self.process_streams(event_access).await?; Ok(event_cnt) } /// We should improve the error handling and likely add some batching if the number of ready events is very high. /// We copy the events up front to avoid losing any events if the task is cancelled. - async fn persist_ready_events(&mut self, pool: &SqlitePool) -> Result<()> { + async fn persist_ready_events(&mut self, event_access: Arc) -> Result<()> { if !self.deliverable.is_empty() { tracing::debug!(count=%self.deliverable.len(), "Marking events as ready to deliver"); - let mut tx = pool.begin_tx().await?; + let mut tx = event_access.begin_tx().await?; // We process the ready events as a FIFO queue so they are marked delivered before events that were added after and depend on them. // Could use `pop_front` but we want to make sure we commit and then clear everything at once. for cid in &self.deliverable { - CeramicOneEvent::mark_ready_to_deliver(&mut tx, cid).await?; + event_access.mark_ready_to_deliver(&mut tx, cid).await?; } tx.commit().await?; self.deliverable.clear(); @@ -597,6 +601,7 @@ impl OrderingState { #[cfg(test)] mod test { use crate::store::EventInsertable; + use ceramic_sql::sqlite::SqlitePool; use test_log::test; use crate::{ @@ -624,20 +629,19 @@ mod test { #[test(tokio::test)] async fn test_undelivered_batch_empty() { let pool = SqlitePool::connect_in_memory().await.unwrap(); - let processed = OrderingState::process_all_undelivered_events(&pool, 10, 100) + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); + let processed = OrderingState::process_all_undelivered_events(event_access, 10, 100) .await .unwrap(); assert_eq!(0, processed); } - async fn insert_10_with_9_undelivered(pool: &SqlitePool) -> Vec { + async fn insert_10_with_9_undelivered(event_access: Arc) -> Vec { let mut insertable = get_n_insertable_events(10).await; let mut init = insertable.remove(0); init.set_deliverable(true); - let new = CeramicOneEvent::insert_many(pool, insertable.iter()) - .await - .unwrap(); + let new = event_access.insert_many(insertable.iter()).await.unwrap(); assert_eq!(9, new.inserted.len()); assert_eq!( @@ -648,9 +652,7 @@ mod test { .count() ); - let new = CeramicOneEvent::insert_many(pool, [&init].into_iter()) - .await - .unwrap(); + let new = event_access.insert_many([&init].into_iter()).await.unwrap(); assert_eq!(1, new.inserted.len()); assert_eq!( 1, @@ -665,35 +667,33 @@ mod test { #[test(tokio::test)] async fn test_undelivered_batch_offset() { let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); - insert_10_with_9_undelivered(&pool).await; - let (_, events) = CeramicOneEvent::new_events_since_value(&pool, 0, 100) - .await - .unwrap(); + insert_10_with_9_undelivered(Arc::clone(&event_access)).await; + let (_, events) = event_access.new_events_since_value(0, 100).await.unwrap(); assert_eq!(1, events.len()); - let processed = OrderingState::process_all_undelivered_events(&pool, 1, 5) - .await - .unwrap(); + let processed = + OrderingState::process_all_undelivered_events(Arc::clone(&event_access), 1, 5) + .await + .unwrap(); assert_eq!(5, processed); - let (_, events) = CeramicOneEvent::new_events_since_value(&pool, 0, 100) - .await - .unwrap(); + let (_, events) = event_access.new_events_since_value(0, 100).await.unwrap(); assert_eq!(6, events.len()); // the last 5 are processed and we have 10 delivered - let processed = OrderingState::process_all_undelivered_events(&pool, 1, 5) - .await - .unwrap(); + let processed = + OrderingState::process_all_undelivered_events(Arc::clone(&event_access), 1, 5) + .await + .unwrap(); assert_eq!(4, processed); - let (_, events) = CeramicOneEvent::new_events_since_value(&pool, 0, 100) - .await - .unwrap(); + let (_, events) = event_access.new_events_since_value(0, 100).await.unwrap(); assert_eq!(10, events.len()); // nothing left - let processed = OrderingState::process_all_undelivered_events(&pool, 1, 100) - .await - .unwrap(); + let processed = + OrderingState::process_all_undelivered_events(Arc::clone(&event_access), 1, 100) + .await + .unwrap(); assert_eq!(0, processed); } @@ -701,97 +701,92 @@ mod test { #[test(tokio::test)] async fn test_undelivered_batch_iterations_ends_early() { let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); // create 5 streams with 9 undelivered events each - insert_10_with_9_undelivered(&pool).await; - insert_10_with_9_undelivered(&pool).await; - insert_10_with_9_undelivered(&pool).await; - insert_10_with_9_undelivered(&pool).await; - insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(Arc::clone(&event_access)).await; + insert_10_with_9_undelivered(Arc::clone(&event_access)).await; + insert_10_with_9_undelivered(Arc::clone(&event_access)).await; + insert_10_with_9_undelivered(Arc::clone(&event_access)).await; + insert_10_with_9_undelivered(Arc::clone(&event_access)).await; - let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) - .await - .unwrap(); + let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(5, event.len()); - let _res = OrderingState::process_all_undelivered_events(&pool, 4, 10) + let _res = OrderingState::process_all_undelivered_events(Arc::clone(&event_access), 4, 10) .await .unwrap(); - let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) - .await - .unwrap(); + let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(45, event.len()); } #[test(tokio::test)] async fn test_undelivered_batch_iterations_ends_when_all_found() { let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); // create 5 streams with 9 undelivered events each - insert_10_with_9_undelivered(&pool).await; - insert_10_with_9_undelivered(&pool).await; - insert_10_with_9_undelivered(&pool).await; - insert_10_with_9_undelivered(&pool).await; - insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(Arc::clone(&event_access)).await; + insert_10_with_9_undelivered(Arc::clone(&event_access)).await; + insert_10_with_9_undelivered(Arc::clone(&event_access)).await; + insert_10_with_9_undelivered(Arc::clone(&event_access)).await; + insert_10_with_9_undelivered(Arc::clone(&event_access)).await; - let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) - .await - .unwrap(); + let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(5, event.len()); - let _res = OrderingState::process_all_undelivered_events(&pool, 100_000_000, 5) - .await - .unwrap(); - - let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) - .await - .unwrap(); + let _res = OrderingState::process_all_undelivered_events( + Arc::clone(&event_access), + 100_000_000, + 5, + ) + .await + .unwrap(); + + let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(50, event.len()); } #[test(tokio::test)] async fn test_process_all_undelivered_one_batch() { let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); // create 5 streams with 9 undelivered events each let expected_a = Vec::from_iter( - insert_10_with_9_undelivered(&pool) + insert_10_with_9_undelivered(Arc::clone(&event_access)) .await .into_iter() .map(|e| *e.cid()), ); let expected_b = Vec::from_iter( - insert_10_with_9_undelivered(&pool) + insert_10_with_9_undelivered(Arc::clone(&event_access)) .await .into_iter() .map(|e| *e.cid()), ); let expected_c = Vec::from_iter( - insert_10_with_9_undelivered(&pool) + insert_10_with_9_undelivered(Arc::clone(&event_access)) .await .into_iter() .map(|e| *e.cid()), ); let expected_d = Vec::from_iter( - insert_10_with_9_undelivered(&pool) + insert_10_with_9_undelivered(Arc::clone(&event_access)) .await .into_iter() .map(|e| *e.cid()), ); let expected_e = Vec::from_iter( - insert_10_with_9_undelivered(&pool) + insert_10_with_9_undelivered(Arc::clone(&event_access)) .await .into_iter() .map(|e| *e.cid()), ); - let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) - .await - .unwrap(); + let (_hw, event) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(5, event.len()); - let _res = OrderingState::process_all_undelivered_events(&pool, 1, 100) + let _res = OrderingState::process_all_undelivered_events(Arc::clone(&event_access), 1, 100) .await .unwrap(); - let (_hw, cids) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) - .await - .unwrap(); + let (_hw, cids) = event_access.new_events_since_value(0, 1000).await.unwrap(); assert_eq!(50, cids.len()); assert_eq!(expected_a, build_expected(&cids, &expected_a)); assert_eq!(expected_b, build_expected(&cids, &expected_b)); diff --git a/event-svc/src/event/service.rs b/event-svc/src/event/service.rs index 48968f62..3cc2a6ff 100644 --- a/event-svc/src/event/service.rs +++ b/event-svc/src/event/service.rs @@ -21,7 +21,7 @@ use recon::ReconItem; use tracing::{trace, warn}; use crate::event::validator::EthRpcProvider; -use crate::store::{CeramicOneEvent, EventInsertable, EventRowDelivered}; +use crate::store::{EventAccess, EventInsertable, EventRowDelivered}; use crate::{Error, Result}; /// How many events to select at once to see if they've become deliverable when we have downtime @@ -53,6 +53,7 @@ pub struct EventService { delivery_task: DeliverableTask, event_validator: EventValidator, pending_writes: Arc>>>, + pub(crate) event_access: Arc, } /// An object that represents a set of blocks that can produce a stream of all blocks and lookup a /// block based on CID. @@ -90,11 +91,13 @@ impl EventService { validate_events: bool, ethereum_rpc_providers: Vec, ) -> Result { - CeramicOneEvent::init_delivered_order(&pool).await?; + let event_access = Arc::new(EventAccess::try_new(pool.clone()).await?); - let delivery_task = OrderingTask::run(pool.clone(), PENDING_EVENTS_CHANNEL_DEPTH).await; + let delivery_task = + OrderingTask::run(Arc::clone(&event_access), PENDING_EVENTS_CHANNEL_DEPTH).await; - let event_validator = EventValidator::try_new(pool.clone(), ethereum_rpc_providers).await?; + let event_validator = + EventValidator::try_new(Arc::clone(&event_access), ethereum_rpc_providers).await?; let svc = Self { pool, @@ -102,6 +105,7 @@ impl EventService { event_validator, delivery_task, pending_writes: Arc::new(Mutex::new(HashMap::default())), + event_access, }; if process_undelivered_events { svc.process_all_undelivered_events().await?; @@ -142,7 +146,7 @@ impl EventService { } /// Given the incoming events, see if any of them are the init event that events were - /// 'pending on' and return all previously pending events that can now be validated. + /// 'pending on' and return all previously pending events that can now be validated. fn remove_unblocked_from_pending_q(&self, new: &[UnvalidatedEvent]) -> Vec { let new_init_cids = new .iter() @@ -162,7 +166,7 @@ impl EventService { /// Returns the number of undelivered events that were updated async fn process_all_undelivered_events(&self) -> Result { OrderingTask::process_all_undelivered_events( - &self.pool, + Arc::clone(&self.event_access), MAX_ITERATIONS, DELIVERABLE_EVENTS_BATCH_SIZE, ) @@ -287,15 +291,18 @@ impl EventService { ) -> Result<(Vec, Vec)> { match deliverable_req { DeliverableRequirement::Immediate => { - let ordered = - OrderEvents::find_currently_deliverable(&self.pool, to_insert).await?; + let ordered = OrderEvents::find_currently_deliverable( + Arc::clone(&self.event_access), + to_insert, + ) + .await?; let to_insert = ordered.deliverable().iter(); invalid.extend(ordered.missing_history().iter().map(|e| { ValidationError::RequiresHistory { key: e.order_key().clone(), } })); - let store_result = CeramicOneEvent::insert_many(&self.pool, to_insert).await?; + let store_result = self.event_access.insert_many(to_insert).await?; Ok(Self::partition_store_result(store_result)) } DeliverableRequirement::Asap => { @@ -305,15 +312,14 @@ impl EventService { .iter() .chain(ordered.missing_history().iter()); - let store_result = CeramicOneEvent::insert_many(&self.pool, to_insert).await?; + let store_result = self.event_access.insert_many(to_insert).await?; self.notify_ordering_task(&store_result).await?; Ok(Self::partition_store_result(store_result)) } DeliverableRequirement::Lazy => { - let store_result = - CeramicOneEvent::insert_many(&self.pool, to_insert.iter()).await?; + let store_result = self.event_access.insert_many(to_insert.iter()).await?; Ok(Self::partition_store_result(store_result)) } @@ -403,7 +409,7 @@ impl EventService { .map_err(|e| { Error::new_app(anyhow::anyhow!("Failed to serialize IPLD data: {}", e)) })?, - index: 0, + index: delivered as u64, })) } } @@ -411,9 +417,10 @@ impl EventService { // Helper method to get an event by its CID async fn get_event_by_cid(&self, cid: &Cid) -> Result> { - let data_bytes = CeramicOneEvent::value_by_cid(&self.pool, cid) - .await? - .ok_or_else(|| Error::new_fatal(anyhow::anyhow!("Event not found for CID: {}", cid)))?; + let data_bytes = + self.event_access.value_by_cid(cid).await?.ok_or_else(|| { + Error::new_fatal(anyhow::anyhow!("Event not found for CID: {}", cid)) + })?; let (_, event) = ceramic_event::unvalidated::Event::::decode_car( std::io::Cursor::new(data_bytes), @@ -429,9 +436,10 @@ impl EventService { highwater_mark: i64, limit: i64, ) -> Result> { - let (_, data) = - CeramicOneEvent::new_events_since_value_with_data(&self.pool, highwater_mark, limit) - .await?; + let (_, data) = self + .event_access + .new_events_since_value_with_data(highwater_mark, limit) + .await?; Ok(data) } diff --git a/event-svc/src/event/store.rs b/event-svc/src/event/store.rs index 5809edb6..3880889c 100644 --- a/event-svc/src/event/store.rs +++ b/event-svc/src/event/store.rs @@ -11,7 +11,7 @@ use recon::{HashCount, ReconItem, Result as ReconResult, Sha256a}; use tracing::info; use crate::event::{DeliverableRequirement, EventService}; -use crate::store::{CeramicOneBlock, CeramicOneEvent, EventInsertable}; +use crate::store::{BlockAccess, EventInsertable}; use crate::Error; use super::service::{InsertResult, ValidationError, ValidationRequirement}; @@ -71,7 +71,9 @@ impl recon::Store for EventService { /// Both range bounds are exclusive. /// Returns ReconResult<(Hash, count), Err> async fn hash_range(&self, range: Range<&Self::Key>) -> ReconResult> { - let res = CeramicOneEvent::hash_range(&self.pool, range) + let res = self + .event_access + .hash_range(range) .await .map_err(Error::from)?; Ok(res) @@ -88,7 +90,8 @@ impl recon::Store for EventService { limit: usize, ) -> ReconResult + Send + 'static>> { Ok(Box::new( - CeramicOneEvent::range(&self.pool, range, offset, limit) + self.event_access + .range(range, offset, limit) .await .map_err(Error::from)? .into_iter(), @@ -106,7 +109,8 @@ impl recon::Store for EventService { limit: usize, ) -> ReconResult)> + Send + 'static>> { Ok(Box::new( - CeramicOneEvent::range_with_values(&self.pool, range, offset, limit) + self.event_access + .range_with_values(range, offset, limit) .await .map_err(Error::from)? .into_iter(), @@ -114,9 +118,7 @@ impl recon::Store for EventService { } /// Return the number of keys within the range. async fn count(&self, range: Range<&Self::Key>) -> ReconResult { - Ok(CeramicOneEvent::count(&self.pool, range) - .await - .map_err(Error::from)?) + Ok(self.event_access.count(range).await.map_err(Error::from)?) } /// value_for_key returns @@ -124,7 +126,9 @@ impl recon::Store for EventService { /// Ok(None) if not stored, and /// Err(e) if retrieving failed. async fn value_for_key(&self, key: &Self::Key) -> ReconResult>> { - Ok(CeramicOneEvent::value_by_order_key(&self.pool, key) + Ok(self + .event_access + .value_by_order_key(key) .await .map_err(Error::from)?) } @@ -133,17 +137,17 @@ impl recon::Store for EventService { #[async_trait::async_trait] impl iroh_bitswap::Store for EventService { async fn get_size(&self, cid: &Cid) -> anyhow::Result { - Ok(CeramicOneBlock::get_size(&self.pool, cid).await?) + Ok(BlockAccess::get_size(&self.pool, cid).await?) } async fn get(&self, cid: &Cid) -> anyhow::Result { - let maybe = CeramicOneBlock::get(&self.pool, cid).await?; + let maybe = BlockAccess::get(&self.pool, cid).await?; maybe.ok_or_else(|| anyhow!("block {} does not exist", cid)) } async fn has(&self, cid: &Cid) -> anyhow::Result { - Ok(CeramicOneBlock::has(&self.pool, cid).await?) + Ok(BlockAccess::has(&self.pool, cid).await?) } async fn put(&self, block: &Block) -> anyhow::Result { - Ok(CeramicOneBlock::put(&self.pool, block).await?) + Ok(BlockAccess::put(&self.pool, block).await?) } } @@ -209,7 +213,8 @@ impl ceramic_api::EventService for EventService { offset: usize, limit: usize, ) -> anyhow::Result)>> { - CeramicOneEvent::range_with_values(&self.pool, &range.start..&range.end, offset, limit) + self.event_access + .range_with_values(&range.start..&range.end, offset, limit) .await? .into_iter() .map(|(event_id, value)| { @@ -224,11 +229,11 @@ impl ceramic_api::EventService for EventService { } async fn value_for_order_key(&self, key: &EventId) -> anyhow::Result>> { - Ok(CeramicOneEvent::value_by_order_key(&self.pool, key).await?) + Ok(self.event_access.value_by_order_key(key).await?) } async fn value_for_cid(&self, key: &Cid) -> anyhow::Result>> { - Ok(CeramicOneEvent::value_by_cid(&self.pool, key).await?) + Ok(self.event_access.value_by_cid(key).await?) } async fn events_since_highwater_mark( @@ -239,8 +244,10 @@ impl ceramic_api::EventService for EventService { ) -> anyhow::Result<(i64, Vec)> { let res = match include_data { ceramic_api::IncludeEventData::None => { - let (hw, cids) = - CeramicOneEvent::new_events_since_value(&self.pool, highwater, limit).await?; + let (hw, cids) = self + .event_access + .new_events_since_value(highwater, limit) + .await?; let res = cids .into_iter() .map(|cid| ceramic_api::EventDataResult::new(cid, None)) @@ -248,9 +255,10 @@ impl ceramic_api::EventService for EventService { (hw, res) } ceramic_api::IncludeEventData::Full => { - let (hw, data) = - CeramicOneEvent::new_events_since_value_with_data(&self.pool, highwater, limit) - .await?; + let (hw, data) = self + .event_access + .new_events_since_value_with_data(highwater, limit) + .await?; let mut res = Vec::with_capacity(data.len()); for row in data { res.push(ceramic_api::EventDataResult::new( @@ -266,11 +274,11 @@ impl ceramic_api::EventService for EventService { } async fn highwater_mark(&self) -> anyhow::Result { - Ok(CeramicOneEvent::get_highwater_mark(&self.pool).await?) + Ok(self.event_access.get_highwater_mark().await?) } async fn get_block(&self, cid: &Cid) -> anyhow::Result>> { - let block = CeramicOneBlock::get(&self.pool, cid).await?; + let block = BlockAccess::get(&self.pool, cid).await?; Ok(block.map(|b| b.data.to_vec())) } } @@ -295,7 +303,8 @@ impl ceramic_anchor_service::Store for EventService { .map_err(|e| anyhow!("could not create EventInsertable: {}", e)) }) .collect::>>()?; - CeramicOneEvent::insert_many(&self.pool, items.iter()) + self.event_access + .insert_many(items.iter()) .await .context("anchoring insert_many failed")?; Ok(()) @@ -308,15 +317,10 @@ impl ceramic_anchor_service::Store for EventService { limit: i64, ) -> Result> { // Fetch event CIDs from the events table using the previous high water mark - Ok( - CeramicOneEvent::data_events_by_informant( - &self.pool, - informant, - high_water_mark, - limit, - ) + Ok(self + .event_access + .data_events_by_informant(informant, high_water_mark, limit) .await - .map_err(|e| Error::new_app(anyhow!("could not fetch events by informant: {}", e)))?, - ) + .map_err(|e| Error::new_app(anyhow!("could not fetch events by informant: {}", e)))?) } } diff --git a/event-svc/src/event/validator/event.rs b/event-svc/src/event/validator/event.rs index ffd406c3..d0a12fde 100644 --- a/event-svc/src/event/validator/event.rs +++ b/event-svc/src/event/validator/event.rs @@ -7,6 +7,7 @@ use recon::ReconItem; use tokio::try_join; use crate::event::validator::EthRpcProvider; +use crate::store::EventAccess; use crate::{ event::{ service::{ValidationError, ValidationRequirement}, @@ -16,7 +17,7 @@ use crate::{ time::{ChainInclusionError, TimeEventValidator}, }, }, - store::{EventInsertable, SqlitePool}, + store::EventInsertable, Result, }; @@ -117,7 +118,7 @@ impl ValidatedEvents { #[derive(Debug)] pub struct EventValidator { - pool: SqlitePool, + event_access: Arc, /// The validator to use for time events if enabled. /// It contains the ethereum RPC providers and lives for the live of the [`EventValidator`]. /// The [`SignedEventValidator`] is currently constructed on a per validation request basis @@ -128,13 +129,13 @@ pub struct EventValidator { impl EventValidator { /// Create a new event validator pub async fn try_new( - pool: SqlitePool, + event_access: Arc, ethereum_rpc_providers: Vec, ) -> Result { let time_event_verifier = TimeEventValidator::new_with_providers(ethereum_rpc_providers); Ok(Self { - pool, + event_access, time_event_verifier, }) } @@ -192,7 +193,7 @@ impl EventValidator { } }; SignedEventValidator::validate_events( - &self.pool, + Arc::clone(&self.event_access), &opts, events, validation_req.require_local_init, @@ -206,7 +207,7 @@ impl EventValidator { // TODO: better transient error handling from RPC client match self .time_event_verifier - .validate_chain_inclusion(&self.pool, time_event.as_time()) + .validate_chain_inclusion(time_event.as_time()) .await { Ok(_t) => { @@ -262,6 +263,7 @@ impl EventValidator { #[cfg(test)] mod test { + use ceramic_sql::sqlite::SqlitePool; use test_log::test; use crate::tests::{build_recon_item_with_controller, get_n_events}; @@ -288,9 +290,10 @@ mod test { #[test(tokio::test)] async fn valid_invalid_pending_recon() { let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); let events = get_validation_events().await; - let validated = EventValidator::try_new(pool, vec![]) + let validated = EventValidator::try_new(event_access, vec![]) .await .unwrap() .validate_events(Some(&ValidationRequirement::new_recon()), events) @@ -312,9 +315,10 @@ mod test { #[test(tokio::test)] async fn valid_invalid_pending_local() { let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); let events = get_validation_events().await; - let validated = EventValidator::try_new(pool, vec![]) + let validated = EventValidator::try_new(event_access, vec![]) .await .unwrap() .validate_events(Some(&ValidationRequirement::new_local()), events) @@ -336,9 +340,10 @@ mod test { #[test(tokio::test)] async fn always_valid_when_none() { let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); let events = get_validation_events().await; - let validated = EventValidator::try_new(pool, vec![]) + let validated = EventValidator::try_new(event_access, vec![]) .await .unwrap() .validate_events(None, events) diff --git a/event-svc/src/event/validator/signed.rs b/event-svc/src/event/validator/signed.rs index cd8d8b8c..d53a7a61 100644 --- a/event-svc/src/event/validator/signed.rs +++ b/event-svc/src/event/validator/signed.rs @@ -10,11 +10,7 @@ use cid::Cid; use ipld_core::ipld::Ipld; use tracing::warn; -use crate::{ - event::service::ValidationError, - store::{CeramicOneEvent, SqlitePool}, - Result, -}; +use crate::{event::service::ValidationError, store::EventAccess, Result}; use super::{ event::{ValidatedEvent, ValidatedEvents}, @@ -82,7 +78,7 @@ impl SignedEventValidator { } pub async fn validate_events( - pool: &SqlitePool, + event_access: Arc, opts: &VerifyJwsOpts, events: SignedValidationBatch, require_init_event: bool, @@ -102,14 +98,14 @@ impl SignedEventValidator { for event in events.init { let status = validator - .validate_signed(pool, opts, event.as_signed()) + .validate_signed(Arc::clone(&event_access), opts, event.as_signed()) .await?; validator.process_result(status, event.into_inner()); } for event in events.data { let status = validator - .validate_signed(pool, opts, event.as_signed()) + .validate_signed(Arc::clone(&event_access), opts, event.as_signed()) .await?; validator.process_result(status, event.into_inner()); } @@ -152,7 +148,7 @@ impl SignedEventValidator { /// Returns an enum representing the validation status and what type of event it was async fn validate_signed( &mut self, - pool: &SqlitePool, + event_access: Arc, opts: &VerifyJwsOpts, signed: &signed::Event, ) -> Result { @@ -161,7 +157,7 @@ impl SignedEventValidator { let init = if let Some(init) = self.init_map.get(d.id()) { init.to_owned() } else { - match CeramicOneEvent::value_by_cid(pool, d.id()).await? { + match event_access.value_by_cid(d.id()).await? { Some(init) => { let (init_cid, init) = unvalidated::Event::::decode_car( diff --git a/event-svc/src/event/validator/time.rs b/event-svc/src/event/validator/time.rs index 5afadf97..d073a835 100644 --- a/event-svc/src/event/validator/time.rs +++ b/event-svc/src/event/validator/time.rs @@ -4,7 +4,6 @@ use anyhow::{anyhow, bail, Result}; use ceramic_core::ssi::caip2; use ceramic_core::Cid; use ceramic_event::unvalidated; -use ceramic_sql::sqlite::SqlitePool; use multihash::Multihash; use once_cell::sync::Lazy; use tracing::warn; @@ -123,7 +122,6 @@ impl TimeEventValidator { /// Validate the chain inclusion proof for a time event, returning the block timestamp if found pub async fn validate_chain_inclusion( &self, - _pool: &SqlitePool, event: &unvalidated::TimeEvent, ) -> Result { let chain_id = caip2::ChainId::from_str(event.proof().chain_id()) @@ -420,11 +418,10 @@ mod test { #[test(tokio::test)] async fn valid_proof_single() { let event = time_event_single_event_batch(); - let pool = SqlitePool::connect_in_memory().await.unwrap(); let verifier = get_mock_provider(SINGLE_TX_HASH.to_string(), SINGLE_TX_HASH_INPUT.into()).await; - match verifier.validate_chain_inclusion(&pool, &event).await { + match verifier.validate_chain_inclusion(&event).await { Ok(ts) => { assert_eq!(ts.as_unix_ts(), BLOCK_TIMESTAMP); } @@ -435,11 +432,10 @@ mod test { #[test(tokio::test)] async fn invalid_proof_single() { let event = time_event_single_event_batch(); - let pool = SqlitePool::connect_in_memory().await.unwrap(); let verifier = get_mock_provider(SINGLE_TX_HASH.to_string(), MULTI_TX_HASH_INPUT.to_string()).await; - match verifier.validate_chain_inclusion(&pool, &event).await { + match verifier.validate_chain_inclusion(&event).await { Ok(v) => { panic!("should have failed: {:?}", v) } @@ -457,11 +453,10 @@ mod test { #[test(tokio::test)] async fn valid_proof_multi() { let event = time_event_multi_event_batch(); - let pool = SqlitePool::connect_in_memory().await.unwrap(); let verifier = get_mock_provider(MULTI_TX_HASH.to_string(), MULTI_TX_HASH_INPUT.to_string()).await; - match verifier.validate_chain_inclusion(&pool, &event).await { + match verifier.validate_chain_inclusion(&event).await { Ok(ts) => { assert_eq!(ts.as_unix_ts(), BLOCK_TIMESTAMP); } @@ -472,11 +467,10 @@ mod test { #[test(tokio::test)] async fn invalid_root_tx_proof_cid_multi() { let event = time_event_multi_event_batch(); - let pool = SqlitePool::connect_in_memory().await.unwrap(); let verifier = get_mock_provider(MULTI_TX_HASH.to_string(), SINGLE_TX_HASH_INPUT.to_string()).await; - match verifier.validate_chain_inclusion(&pool, &event).await { + match verifier.validate_chain_inclusion(&event).await { Ok(v) => { panic!("should have failed: {:?}", v) } diff --git a/event-svc/src/store/mod.rs b/event-svc/src/store/mod.rs index fc8a8e53..2844b561 100644 --- a/event-svc/src/store/mod.rs +++ b/event-svc/src/store/mod.rs @@ -6,6 +6,6 @@ mod sql; pub use metrics::{Metrics, StoreMetricsMiddleware}; pub use sql::{ entities::{BlockHash, EventBlockRaw, EventInsertable}, - CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneVersion, Error, - EventRowDelivered, InsertResult, InsertedEvent, Result, SqlitePool, SqliteRootStore, + BlockAccess, Error, EventAccess, EventBlockAccess, EventRowDelivered, InsertResult, + InsertedEvent, Result, SqlitePool, SqliteRootStore, VersionAccess, }; diff --git a/event-svc/src/store/sql/access/block.rs b/event-svc/src/store/sql/access/block.rs index 3e2027fd..163f903f 100644 --- a/event-svc/src/store/sql/access/block.rs +++ b/event-svc/src/store/sql/access/block.rs @@ -11,9 +11,9 @@ use crate::store::{ }; /// Access to the block table and related logic -pub struct CeramicOneBlock {} +pub struct BlockAccess {} -impl CeramicOneBlock { +impl BlockAccess { /// Insert a block in a transaction (i.e. when creating an event) pub(crate) async fn insert( conn: &mut SqliteTransaction<'_>, @@ -69,7 +69,7 @@ impl CeramicOneBlock { /// Add a block to the database pub async fn put(pool: &SqlitePool, block: &iroh_bitswap::Block) -> Result { let mut tx = pool.begin_tx().await?; - let new = CeramicOneBlock::insert(&mut tx, block.cid().hash(), block.data()).await?; + let new = BlockAccess::insert(&mut tx, block.cid().hash(), block.data()).await?; tx.commit().await?; Ok(new) } diff --git a/event-svc/src/store/sql/access/event.rs b/event-svc/src/store/sql/access/event.rs index 98178491..131705c8 100644 --- a/event-svc/src/store/sql/access/event.rs +++ b/event-svc/src/store/sql/access/event.rs @@ -10,6 +10,7 @@ use ceramic_core::{event_id::InvalidEventId, Cid, EventId, NodeId}; use ceramic_event::unvalidated; use ceramic_sql::sqlite::{SqlitePool, SqliteTransaction}; use ipld_core::ipld::Ipld; +use itertools::Itertools; use recon::{AssociativeHash, HashCount, Key, Sha256a}; use crate::store::{ @@ -20,11 +21,9 @@ use crate::store::{ }, query::{EventQuery, ReconQuery, SqlBackend}, }, - CeramicOneBlock, CeramicOneEventBlock, Error, Result, + BlockAccess, Error, EventBlockAccess, Result, }; -static GLOBAL_COUNTER: AtomicI64 = AtomicI64::new(0); - #[derive(Debug)] /// An event that was inserted into the database pub struct InsertedEvent<'a> { @@ -70,15 +69,34 @@ pub struct EventRowDelivered { } /// Access to the ceramic event table and related logic -pub struct CeramicOneEvent {} +/// Unlike other access models the event access must maintain state. +#[derive(Debug)] +pub struct EventAccess { + // The delivered count is tied to a specific sql db so we keep a reference to the pool here so + // that we cannot mix delivered counts across dbs. + pool: SqlitePool, + delivered_counter: AtomicI64, +} -impl CeramicOneEvent { - fn next_deliverable() -> i64 { - GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst) +impl EventAccess { + /// Gain access to the events table. + /// EventAccess is stateful and the state is tied to the give connection pool, as such a pool + /// must be provided so as to not accidentially mix connections to different dbs. + pub async fn try_new(pool: SqlitePool) -> Result { + let s = Self { + pool, + delivered_counter: Default::default(), + }; + s.init_delivered_order().await?; + Ok(s) + } + fn next_deliverable(&self) -> i64 { + self.delivered_counter.fetch_add(1, Ordering::SeqCst) } /// Insert the event and its hash into the ceramic_one_event table async fn insert_event( + &self, tx: &mut SqliteTransaction<'_>, key: &EventId, stream_cid: &Cid, @@ -93,7 +111,7 @@ impl CeramicOneEvent { .ok_or_else(|| Error::new_app(anyhow!("Event CID is required")))?; let hash = Sha256a::digest(key); let delivered: Option = if deliverable { - Some(Self::next_deliverable()) + Some(self.next_deliverable()) } else { None }; @@ -128,33 +146,40 @@ impl CeramicOneEvent { Err(err) => Err(err.into()), } } -} -impl CeramicOneEvent { /// Initialize the delivered event counter. Should be called on startup. - pub async fn init_delivered_order(pool: &SqlitePool) -> Result<()> { + async fn init_delivered_order(&self) -> Result<()> { let max_delivered: CountRow = sqlx::query_as(EventQuery::max_delivered()) - .fetch_one(pool.reader()) + .fetch_one(self.pool.reader()) .await?; let max = max_delivered .res .checked_add(1) .ok_or_else(|| Error::new_fatal(anyhow!("More than i64::MAX delivered events!")))?; - GLOBAL_COUNTER.fetch_max(max, std::sync::atomic::Ordering::SeqCst); + self.delivered_counter + .fetch_max(max, std::sync::atomic::Ordering::SeqCst); Ok(()) } + /// Start a new transaction + pub async fn begin_tx(&self) -> Result> { + self.pool.begin_tx().await + } /// Get the current highwater mark for delivered events. - pub async fn get_highwater_mark(_pool: &SqlitePool) -> Result { - Ok(GLOBAL_COUNTER.load(Ordering::Relaxed)) + pub async fn get_highwater_mark(&self) -> Result { + Ok(self.delivered_counter.load(Ordering::Relaxed)) } /// Mark an event ready to deliver to js-ceramic or other clients. This implies it's valid and it's previous events are known. - pub async fn mark_ready_to_deliver(conn: &mut SqliteTransaction<'_>, key: &Cid) -> Result<()> { + pub async fn mark_ready_to_deliver( + &self, + conn: &mut SqliteTransaction<'_>, + key: &Cid, + ) -> Result<()> { // Fetch add happens with an open transaction (on one writer for the db) so we're guaranteed to get a unique value sqlx::query(EventQuery::mark_ready_to_deliver()) - .bind(Self::next_deliverable()) + .bind(self.next_deliverable()) .bind(key.to_bytes()) .execute(&mut **conn.inner()) .await?; @@ -170,33 +195,34 @@ impl CeramicOneEvent { /// That is, events will be processed in the order they are given so earlier events are given a lower global ordering /// and will be returned earlier in the feed. Events can be intereaved with different streams, but if two events /// depend on each other, the `prev` must come first in the list to ensure the correct order for indexers and consumers. - pub async fn insert_many<'a, I>(pool: &SqlitePool, to_add: I) -> Result> + pub async fn insert_many<'a, I>(&self, to_add: I) -> Result> where I: Iterator, { let mut inserted = Vec::new(); - let mut tx = pool.begin_tx().await.map_err(Error::from)?; + let mut tx = self.pool.begin_tx().await.map_err(Error::from)?; for item in to_add { - let new_key = Self::insert_event( - &mut tx, - item.order_key(), - item.stream_cid(), - item.informant(), - item.deliverable(), - item.event().is_time_event(), - ) - .await?; + let new_key = self + .insert_event( + &mut tx, + item.order_key(), + item.stream_cid(), + item.informant(), + item.deliverable(), + item.event().is_time_event(), + ) + .await?; inserted.push(InsertedEvent::new(new_key, item)); if new_key { for block in item.get_raw_blocks().await?.iter() { - CeramicOneBlock::insert(&mut tx, block.multihash.inner(), &block.bytes).await?; - CeramicOneEventBlock::insert(&mut tx, block).await?; + BlockAccess::insert(&mut tx, block.multihash.inner(), &block.bytes).await?; + EventBlockAccess::insert(&mut tx, block).await?; } } // the item already existed so we didn't mark it as deliverable on insert if !new_key && item.deliverable() { - Self::mark_ready_to_deliver(&mut tx, item.cid()).await?; + self.mark_ready_to_deliver(&mut tx, item.cid()).await?; } } tx.commit().await.map_err(Error::from)?; @@ -206,21 +232,18 @@ impl CeramicOneEvent { } /// Calculate the hash of a range of events - pub async fn hash_range( - pool: &SqlitePool, - range: Range<&EventId>, - ) -> Result> { + pub async fn hash_range(&self, range: Range<&EventId>) -> Result> { let row: ReconHash = sqlx::query_as(ReconQuery::hash_range(SqlBackend::Sqlite)) .bind(range.start.as_bytes()) .bind(range.end.as_bytes()) - .fetch_one(pool.reader()) + .fetch_one(self.pool.reader()) .await?; Ok(HashCount::new(Sha256a::from(row.hash()), row.count())) } /// Find a range of event IDs pub async fn range( - pool: &SqlitePool, + &self, range: Range<&EventId>, offset: usize, limit: usize, @@ -234,7 +257,7 @@ impl CeramicOneEvent { .bind(range.end.as_bytes()) .bind(limit) .bind(offset) - .fetch_all(pool.reader()) + .fetch_all(self.pool.reader()) .await .map_err(Error::from)?; let rows = rows @@ -246,7 +269,7 @@ impl CeramicOneEvent { /// Find a range of event IDs with their values. Should replace `range` when we move to discovering values and keys simultaneously. pub async fn range_with_values( - pool: &SqlitePool, + &self, range: Range<&EventId>, offset: usize, limit: usize, @@ -260,7 +283,7 @@ impl CeramicOneEvent { .bind(range.end.as_bytes()) .bind(limit) .bind(offset) - .fetch_all(pool.reader()) + .fetch_all(self.pool.reader()) .await?; let values = ReconEventBlockRaw::into_carfiles(all_blocks).await?; @@ -268,18 +291,18 @@ impl CeramicOneEvent { } /// Count the number of events in a range - pub async fn count(pool: &SqlitePool, range: Range<&EventId>) -> Result { + pub async fn count(&self, range: Range<&EventId>) -> Result { let row: CountRow = sqlx::query_as(ReconQuery::count(SqlBackend::Sqlite)) .bind(range.start.as_bytes()) .bind(range.end.as_bytes()) - .fetch_one(pool.reader()) + .fetch_one(self.pool.reader()) .await?; Ok(row.res as usize) } /// Returns the root CIDs of all the events found after the given delivered value. pub async fn new_events_since_value( - pool: &SqlitePool, + &self, delivered: i64, limit: i64, ) -> Result<(i64, Vec)> { @@ -293,7 +316,7 @@ impl CeramicOneEvent { sqlx::query_as(EventQuery::new_delivered_events_id_only()) .bind(delivered) .bind(limit) - .fetch_all(pool.reader()) + .fetch_all(self.pool.reader()) .await?; let max: i64 = rows.last().map_or(delivered, |r| r.new_highwater_mark + 1); @@ -324,7 +347,7 @@ impl CeramicOneEvent { /// The events are returned in order of their delivered value. The number of events /// returned is limited by the `limit` parameter passed to the function. pub async fn new_events_since_value_with_data( - pool: &SqlitePool, + &self, highwater: i64, limit: i64, ) -> Result<(i64, Vec)> { @@ -355,7 +378,7 @@ impl CeramicOneEvent { sqlx::query_as(EventQuery::new_delivered_events_with_data()) .bind(highwater) .bind(limit) - .fetch_all(pool.reader()) + .fetch_all(self.pool.reader()) .await?; // default to the passed in value if there are no new events to avoid the client going back to 0 @@ -370,17 +393,30 @@ impl CeramicOneEvent { all_blocks.iter().map(|row| row.block.clone()).collect(), ) .await?; + + // We need to match up the delivered index with each event. However all_blocks contains an + // item for each block within each event. We need to chunk all_blocks by event and then + // find the max delivered for each event. This will create an iterator of a single + // delivered value for each event. With that iterator we can zip it with the parsed block + // car files as there is a 1:1 mapping. + let event_chunks = all_blocks + .into_iter() + .chunk_by(|block| block.block.order_key.clone()); + let delivered_iter = event_chunks + .into_iter() + .map(|(_, event_chunk)| event_chunk.map(|block| block.delivered).max()); + let result: Result> = parsed .into_iter() - .zip(all_blocks.iter()) - .map(|((_, carfile), block)| { + .zip(delivered_iter) + .map(|((_, carfile), delivered)| { let (cid, event) = unvalidated::Event::::decode_car(carfile.as_slice(), false) .map_err(|_| Error::new_fatal(anyhow!("Error parsing event row")))?; Ok(EventRowDelivered { cid, event, - delivered: block.delivered, + delivered: delivered.expect("should always be one block per event"), }) }) .collect(); @@ -395,7 +431,7 @@ impl CeramicOneEvent { /// Returns the events and their values, and the highwater mark of the last event. /// The highwater mark can be used on the next call to get the next batch of events and will be 0 when done. pub async fn undelivered_with_values( - pool: &SqlitePool, + &self, highwater_mark: i64, limit: i64, ) -> Result<(Vec<(Cid, unvalidated::Event)>, i64)> { @@ -418,7 +454,7 @@ impl CeramicOneEvent { sqlx::query_as(EventQuery::undelivered_with_values()) .bind(highwater_mark) .bind(limit) - .fetch_all(pool.reader()) + .fetch_all(self.pool.reader()) .await?; let max_highwater = all_blocks.iter().map(|row| row.row_id).max().unwrap_or(0); // if there's nothing in the list we just return 0 @@ -428,19 +464,19 @@ impl CeramicOneEvent { } /// Finds the event data by a given EventId i.e. "order key". - pub async fn value_by_order_key(pool: &SqlitePool, key: &EventId) -> Result>> { + pub async fn value_by_order_key(&self, key: &EventId) -> Result>> { let blocks: Vec = sqlx::query_as(EventQuery::value_blocks_by_order_key_one()) .bind(key.as_bytes()) - .fetch_all(pool.reader()) + .fetch_all(self.pool.reader()) .await?; rebuild_car(blocks).await } /// Finds the event data by a given CID i.e. the root CID in the carfile of the event. - pub async fn value_by_cid(pool: &SqlitePool, key: &Cid) -> Result>> { + pub async fn value_by_cid(&self, key: &Cid) -> Result>> { let blocks: Vec = sqlx::query_as(EventQuery::value_blocks_by_cid_one()) .bind(key.to_bytes()) - .fetch_all(pool.reader()) + .fetch_all(self.pool.reader()) .await?; rebuild_car(blocks).await } @@ -448,7 +484,7 @@ impl CeramicOneEvent { /// Finds if an event exists and has been previously delivered, meaning anything that depends on it can be delivered. /// returns (bool, bool) = (exists, deliverable) /// We don't guarantee that a client has seen the event, just that it's been marked as deliverable and they could. - pub async fn deliverable_by_cid(pool: &SqlitePool, key: &Cid) -> Result<(bool, bool)> { + pub async fn deliverable_by_cid(&self, key: &Cid) -> Result<(bool, bool)> { #[derive(sqlx::FromRow)] struct CidExists { exists: bool, @@ -456,14 +492,14 @@ impl CeramicOneEvent { } let exist: Option = sqlx::query_as(EventQuery::value_delivered_by_cid()) .bind(key.to_bytes()) - .fetch_optional(pool.reader()) + .fetch_optional(self.pool.reader()) .await?; Ok(exist.map_or((false, false), |row| (row.exists, row.delivered))) } /// Fetch data event CIDs from a specified source that are above the current anchoring high water mark pub async fn data_events_by_informant( - pool: &SqlitePool, + &self, informant: NodeId, high_water_mark: i64, limit: i64, @@ -504,7 +540,7 @@ impl CeramicOneEvent { .bind(informant.did_key()) .bind(high_water_mark) .bind(limit) - .fetch_all(pool.reader()) + .fetch_all(self.pool.reader()) .await .map_err(Error::from)?; let rows = rows diff --git a/event-svc/src/store/sql/access/event_block.rs b/event-svc/src/store/sql/access/event_block.rs index bd8cf7ec..9042f978 100644 --- a/event-svc/src/store/sql/access/event_block.rs +++ b/event-svc/src/store/sql/access/event_block.rs @@ -6,9 +6,9 @@ use crate::store::{ }; /// Access to the event_block table and related logic -pub struct CeramicOneEventBlock {} +pub struct EventBlockAccess {} -impl CeramicOneEventBlock { +impl EventBlockAccess { /// Insert an event block in a transaction i.e. when storing a new ceramic event pub(crate) async fn insert( conn: &mut SqliteTransaction<'_>, diff --git a/event-svc/src/store/sql/access/mod.rs b/event-svc/src/store/sql/access/mod.rs index b27a9201..6f79976f 100644 --- a/event-svc/src/store/sql/access/mod.rs +++ b/event-svc/src/store/sql/access/mod.rs @@ -1,9 +1,9 @@ mod block; -pub mod event; +mod event; mod event_block; mod version; -pub use block::CeramicOneBlock; -pub use event::{CeramicOneEvent, EventRowDelivered, InsertResult, InsertedEvent}; -pub use event_block::CeramicOneEventBlock; -pub use version::CeramicOneVersion; +pub use block::BlockAccess; +pub use event::{EventAccess, EventRowDelivered, InsertResult, InsertedEvent}; +pub use event_block::EventBlockAccess; +pub use version::VersionAccess; diff --git a/event-svc/src/store/sql/access/version.rs b/event-svc/src/store/sql/access/version.rs index 2f57f37d..fcc7080b 100644 --- a/event-svc/src/store/sql/access/version.rs +++ b/event-svc/src/store/sql/access/version.rs @@ -59,9 +59,9 @@ impl std::str::FromStr for SemVer { #[derive(Debug, Clone)] /// Access to ceramic version information -pub struct CeramicOneVersion {} +pub struct VersionAccess {} -impl CeramicOneVersion { +impl VersionAccess { /// Fetch the previous version from the database. May be None if no previous version exists. pub async fn fetch_previous(pool: &SqlitePool) -> Result> { let current = SemVer::from_str(env!("CARGO_PKG_VERSION"))?; @@ -84,12 +84,12 @@ mod test { #[tokio::test] async fn insert_version() { let mem = SqlitePool::connect_in_memory().await.unwrap(); - CeramicOneVersion::insert_current(&mem).await.unwrap(); + VersionAccess::insert_current(&mem).await.unwrap(); } #[tokio::test] async fn prev_version() { let mem = SqlitePool::connect_in_memory().await.unwrap(); - CeramicOneVersion::fetch_previous(&mem).await.unwrap(); + VersionAccess::fetch_previous(&mem).await.unwrap(); } } diff --git a/event-svc/src/store/sql/mod.rs b/event-svc/src/store/sql/mod.rs index 1c70d08f..05cbc637 100644 --- a/event-svc/src/store/sql/mod.rs +++ b/event-svc/src/store/sql/mod.rs @@ -6,8 +6,8 @@ mod root; mod test; pub use access::{ - CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneVersion, EventRowDelivered, - InsertResult, InsertedEvent, + BlockAccess, EventAccess, EventBlockAccess, EventRowDelivered, InsertResult, InsertedEvent, + VersionAccess, }; pub use ceramic_sql::{sqlite::SqlitePool, Error, Result}; pub use root::SqliteRootStore; diff --git a/event-svc/src/store/sql/test.rs b/event-svc/src/store/sql/test.rs index 0ae1fe1b..90b51cd9 100644 --- a/event-svc/src/store/sql/test.rs +++ b/event-svc/src/store/sql/test.rs @@ -14,7 +14,7 @@ use multihash_codetable::{Code, MultihashDigest}; use serde_ipld_dagcbor::codec::DagCborCodec; use test_log::test; -use crate::store::{CeramicOneEvent, EventInsertable, SqlitePool}; +use crate::store::{EventAccess, EventInsertable, SqlitePool}; const MODEL_ID: &str = "k2t6wz4yhfp1r5pwi52gw89nzjbu53qk7m32o5iguw42c6knsaj0feuf927agb"; const CONTROLLER: &str = "did:key:z6Mkqtw7Pj5Lv9xc4PgUYAnwfaVoMC6FRneGWVr5ekTEfKVL"; @@ -59,22 +59,25 @@ fn random_events(num: usize) -> Vec { #[test(tokio::test)] async fn hash_range_query() { let pool = SqlitePool::connect_in_memory().await.unwrap(); + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); let events = random_events(2); let first = &events[0]; let second = &events[1]; - let x = CeramicOneEvent::insert_many(&pool, [first, second].into_iter()) + let x = event_access + .insert_many([first, second].into_iter()) .await .unwrap(); assert_eq!(x.count_new_keys(), 2); - let hash = CeramicOneEvent::hash_range( - &pool, - &event_id_builder().with_min_event().build()..&event_id_builder().with_max_event().build(), - ) - .await - .unwrap(); + let hash = event_access + .hash_range( + &event_id_builder().with_min_event().build() + ..&event_id_builder().with_max_event().build(), + ) + .await + .unwrap(); expect!["71F104AFD1BCDBB85C1548F59DFF2A5FB50E21A23F1A65CCB2F38EF6D92FA659#2"] .assert_eq(&format!("{hash}")); } @@ -85,20 +88,24 @@ async fn range_query() { let first = &events[0]; let second = &events[1]; let pool = SqlitePool::connect_in_memory().await.unwrap(); - let x = CeramicOneEvent::insert_many(&pool, [first, second].into_iter()) + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); + + let x = event_access + .insert_many([first, second].into_iter()) .await .unwrap(); assert_eq!(x.count_new_keys(), 2); - let ids = CeramicOneEvent::range( - &pool, - &event_id_builder().with_min_event().build()..&event_id_builder().with_max_event().build(), - 0, - usize::MAX, - ) - .await - .unwrap(); + let ids = event_access + .range( + &event_id_builder().with_min_event().build() + ..&event_id_builder().with_max_event().build(), + 0, + usize::MAX, + ) + .await + .unwrap(); expect![[r#" [ @@ -146,7 +153,10 @@ async fn range_query() { #[test(tokio::test)] async fn undelivered_with_values() { let pool = SqlitePool::connect_in_memory().await.unwrap(); - let (res, hw) = CeramicOneEvent::undelivered_with_values(&pool, 0, 10000) + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); + + let (res, hw) = event_access + .undelivered_with_values(0, 10000) .await .unwrap(); assert_eq!(res.len(), 0); @@ -156,14 +166,16 @@ async fn undelivered_with_values() { #[test(tokio::test)] async fn range_with_values() { let pool = SqlitePool::connect_in_memory().await.unwrap(); - - let res = CeramicOneEvent::range_with_values( - &pool, - &event_id_builder().with_min_event().build()..&event_id_builder().with_max_event().build(), - 0, - 100000, - ) - .await - .unwrap(); + let event_access = Arc::new(EventAccess::try_new(pool).await.unwrap()); + + let res = event_access + .range_with_values( + &event_id_builder().with_min_event().build() + ..&event_id_builder().with_max_event().build(), + 0, + 100000, + ) + .await + .unwrap(); assert_eq!(res.len(), 0); } diff --git a/event-svc/src/tests/event.rs b/event-svc/src/tests/event.rs index 789cae29..52b1e4c3 100644 --- a/event-svc/src/tests/event.rs +++ b/event-svc/src/tests/event.rs @@ -15,6 +15,7 @@ use itertools::Itertools; use prettytable::{Cell, Row, Table}; use recon::Sha256a; use recon::{InsertResult, ReconItem}; +use test_log::test; use super::*; @@ -604,20 +605,67 @@ where ); } -#[tokio::test] +#[test(tokio::test)] async fn test_conclusion_events_since() -> Result<(), Box> { let pool = SqlitePool::connect_in_memory().await?; let service = EventService::try_new(pool, false, false, vec![]).await?; let test_events = generate_chained_events().await; - for event in test_events { - ceramic_api::EventService::insert_many(&service, vec![event], NodeId::random().0).await?; - } + ceramic_api::EventService::insert_many( + &service, + test_events + .into_iter() + .map(|(event_id, event)| { + ApiItem::new( + event_id, + event.encode_car().expect("test event should encode"), + ) + }) + .collect(), + NodeId::random().0, + ) + .await?; // Fetch conclusion events - let conclusion_events = service.conclusion_events_since(0, 5).await?; - assert_eq!(conclusion_events.len(), 5); + let conclusion_events = service.conclusion_events_since(0, 6).await?; + + expect![[r#" + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ + | index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ + | 1 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 6e756c6c | [] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ + | 2 | Data | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 6e756c6c | [] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ + | 3 | Data | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerap4iyp25kvufzcesbi4ijfseeb4koayw2m2y2wtk3pkm3nb2iezaq | 7b2273747265616d32223a22646174615f31227d | [Cid(bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i)] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ + | 4 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq | 7b2273747265616d5f31223a22646174615f31227d | [Cid(bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q)] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ + | 5 | Time | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bafyreiftj6l432kco7hnb6reklbd7bh2j4jbg5beuvtxp3rhgny7omgali | | [Cid(bagcqcerap4iyp25kvufzcesbi4ijfseeb4koayw2m2y2wtk3pkm3nb2iezaq)] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ + | 6 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqceraccgbaicjznz45ov4wgc3wnr62zqwba24sxzreqerlzdklidysfdq | 7b2273747265616d5f31223a22646174615f32227d | [Cid(bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq)] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ + "#]].assert_eq(&events_to_table(&conclusion_events)); + + // Fetch conclusion events, with non zero watermark + let conclusion_events = service.conclusion_events_since(3, 6).await?; + + expect![[r#" + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ + | index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ + | 4 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq | 7b2273747265616d5f31223a22646174615f31227d | [Cid(bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q)] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ + | 5 | Time | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bafyreiftj6l432kco7hnb6reklbd7bh2j4jbg5beuvtxp3rhgny7omgali | | [Cid(bagcqcerap4iyp25kvufzcesbi4ijfseeb4koayw2m2y2wtk3pkm3nb2iezaq)] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ + | 6 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqceraccgbaicjznz45ov4wgc3wnr62zqwba24sxzreqerlzdklidysfdq | 7b2273747265616d5f31223a22646174615f32227d | [Cid(bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq)] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ + "#]].assert_eq(&events_to_table(&conclusion_events)); + + Ok(()) +} +fn events_to_table(conclusion_events: &[ConclusionEvent]) -> String { // Create a table let mut table = Table::new(); table.add_row(Row::new(vec![ @@ -632,9 +680,10 @@ async fn test_conclusion_events_since() -> Result<(), Box Cell::new("previous"), ])); - for (index, event) in conclusion_events.iter().enumerate() { + for event in conclusion_events { let ( event_type, + index, stream_cid, stream_type, controller, @@ -645,6 +694,7 @@ async fn test_conclusion_events_since() -> Result<(), Box ) = match event { ConclusionEvent::Data(data_event) => ( "Data", + &data_event.index, &data_event.init.stream_cid, &data_event.init.stream_type, &data_event.init.controller, @@ -655,6 +705,7 @@ async fn test_conclusion_events_since() -> Result<(), Box ), ConclusionEvent::Time(time_event) => ( "Time", + &time_event.index, &time_event.init.stream_cid, &time_event.init.stream_type, &time_event.init.controller, @@ -684,23 +735,5 @@ async fn test_conclusion_events_since() -> Result<(), Box ])); } - let table_string = table.to_string(); - - expect![[r#" - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ - | index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ - | 0 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 6e756c6c | [] | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ - | 1 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq | 7b2273747265616d5f31223a22646174615f31227d | [Cid(bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q)] | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ - | 2 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqceraccgbaicjznz45ov4wgc3wnr62zqwba24sxzreqerlzdklidysfdq | 7b2273747265616d5f31223a22646174615f32227d | [Cid(bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq)] | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ - | 3 | Data | bagcqcerazudkhl4sectilrixkvshfutjbkbeqbnh754hh5oerst6txpviuca | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerazudkhl4sectilrixkvshfutjbkbeqbnh754hh5oerst6txpviuca | 6e756c6c | [] | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ - | 4 | Data | bagcqcerazudkhl4sectilrixkvshfutjbkbeqbnh754hh5oerst6txpviuca | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqceramv66jowkwdtxay44fppdcez5bhqfwa7hzystd3lofemuoetd64ra | 7b2273747265616d32223a22646174615f31227d | [Cid(bagcqcerazudkhl4sectilrixkvshfutjbkbeqbnh754hh5oerst6txpviuca)] | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+ - "#]].assert_eq(&table_string); - - Ok(()) + table.to_string() } diff --git a/event-svc/src/tests/mod.rs b/event-svc/src/tests/mod.rs index 39d6f7cc..04214734 100644 --- a/event-svc/src/tests/mod.rs +++ b/event-svc/src/tests/mod.rs @@ -2,9 +2,8 @@ mod event; mod migration; mod ordering; -use std::str::FromStr; +use std::{str::FromStr, sync::Arc}; -use ceramic_api::ApiItem; use ceramic_core::{DidDocument, EventId, Network, StreamId}; use ceramic_event::unvalidated::{self, signed}; use cid::Cid; @@ -14,6 +13,8 @@ use multihash_codetable::{Code, MultihashDigest}; use rand::{thread_rng, Rng}; use recon::ReconItem; +use crate::store::EventAccess; + const CONTROLLER: &str = "did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw"; const SEP_KEY: &str = "model"; const METAMODEL_STREAM_ID: &str = "kh4q0ozorrgaq2mezktnrmdwleo1d"; @@ -56,6 +57,10 @@ pub(crate) fn random_cid() -> Cid { let hash = MultihashDigest::digest(&Code::Sha2_256, &data); Cid::new_v1(0x00, hash) } +pub(crate) fn deterministic_cid(data: &[u8]) -> Cid { + let hash = MultihashDigest::digest(&Code::Sha2_256, &data); + Cid::new_v1(0x00, hash) +} #[derive(Debug)] pub(crate) struct TestEventInfo { @@ -120,13 +125,11 @@ fn gen_rand_bytes() -> [u8; SIZE] { } pub(crate) async fn check_deliverable( - pool: &crate::store::SqlitePool, + event_access: Arc, cid: &Cid, deliverable: bool, ) { - let (exists, delivered) = crate::store::CeramicOneEvent::deliverable_by_cid(pool, cid) - .await - .unwrap(); + let (exists, delivered) = event_access.deliverable_by_cid(cid).await.unwrap(); assert!(exists); if deliverable { assert!(delivered, "{} should be delivered", cid); @@ -144,6 +147,13 @@ pub(crate) async fn signer() -> signed::JwkSigner { .unwrap() } +async fn unsigned_init_event(model: &StreamId) -> unvalidated::init::Event { + let init = unvalidated::Builder::init() + .with_controller(CONTROLLER.to_string()) + .with_sep("model".to_string(), model.to_vec()) + .build(); + unvalidated::init::Event::new(init) +} async fn init_event(model: &StreamId, signer: &signed::JwkSigner) -> signed::Event { let init = unvalidated::Builder::init() .with_controller(CONTROLLER.to_string()) @@ -166,6 +176,20 @@ async fn data_event( signed::Event::from_payload(unvalidated::Payload::Data(commit), signer.to_owned()).unwrap() } +async fn time_event( + init_id: Cid, + prev: Cid, + chain_id: &str, + tx_hash: Cid, + tx_type: &str, +) -> unvalidated::TimeEvent { + unvalidated::Builder::time() + .with_id(init_id) + .with_tx(chain_id.to_string(), tx_hash, tx_type.to_string()) + .with_prev(prev) + .build() + .expect("test data should always build into time event") +} // returns init + N events async fn get_init_plus_n_events_with_model( @@ -235,45 +259,41 @@ pub(crate) async fn get_n_events(number: usize) -> Vec> { /// /// A `Vec>` containing 5 events: /// - 3 events for the first stream (1 init event and 2 data events) -/// - 2 events for the second stream (1 init event and 1 data event) +/// - 2 events for the second stream (1 unsigned init event, 1 data event, and 1 time event) /// /// # Example /// /// ```rust /// let chained_events = generate_chained_events().await; -/// assert_eq!(chained_events.len(), 5); +/// assert_eq!(chained_events.len(), 6); /// ``` -pub(crate) async fn generate_chained_events() -> Vec { - let mut events: Vec = Vec::with_capacity(5); +pub(crate) async fn generate_chained_events() -> Vec<(EventId, unvalidated::Event)> { + let mut events = Vec::with_capacity(6); + events.extend(generate_signed_stream_data().await); + events.extend(generate_unsigned_stream_data_anchored().await); + events +} +// Generates a stream with a signed init event and two data events. +pub(crate) async fn generate_signed_stream_data() -> Vec<(EventId, unvalidated::Event)> { let signer = Box::new(signer().await); - let stream_id_1 = create_deterministic_stream_id_model(&[0x01]); - let stream_id_2 = create_meta_model_stream_id(); - let init_1 = init_event(&stream_id_1, &signer).await; - let init_1_cid = init_1.envelope_cid(); - let (event_id_1, car_1) = ( - build_event_id(init_1_cid, init_1_cid, &stream_id_1), - init_1.encode_car().unwrap(), - ); - let init_1_cid = event_id_1.cid().unwrap(); + let stream_id = create_deterministic_stream_id_model(&[0x01]); + let init = init_event(&stream_id, &signer).await; + let init_cid = *init.envelope_cid(); let data_1 = data_event( - init_1_cid, - init_1_cid, + init_cid, + init_cid, ipld!({ "stream_1" : "data_1" }), &signer, ) .await; - let (data_1_id, data_1_car) = ( - build_event_id(data_1.envelope_cid(), &init_1_cid, &stream_id_1), - data_1.encode_car().unwrap(), - ); let data_2 = data_event( - init_1_cid, - data_1_id.cid().unwrap(), + init_cid, + *data_1.envelope_cid(), ipld!({ "stream_1" : "data_2" }), @@ -281,41 +301,66 @@ pub(crate) async fn generate_chained_events() -> Vec { ) .await; - let (data_2_id, data_2_car) = ( - build_event_id(data_2.envelope_cid(), &init_1_cid, &stream_id_1), - data_2.encode_car().unwrap(), - ); + vec![ + ( + build_event_id(&init_cid, &init_cid, &stream_id), + init.into(), + ), + ( + build_event_id(data_1.envelope_cid(), &init_cid, &stream_id), + data_1.into(), + ), + ( + build_event_id(data_2.envelope_cid(), &init_cid, &stream_id), + data_2.into(), + ), + ] +} - let init_2 = init_event(&stream_id_2, &signer).await; - let init_2_cid = init_2.envelope_cid(); - let (event_id_2, car_2) = ( - build_event_id(init_2_cid, init_2_cid, &stream_id_2), - init_2.encode_car().unwrap(), - ); - let init_2_cid = event_id_2.cid().unwrap(); +// Generates a stream with an unsigned init event, a data events, and a time event. +pub(crate) async fn generate_unsigned_stream_data_anchored( +) -> Vec<(EventId, unvalidated::Event)> { + let signer = Box::new(signer().await); + let stream_id = create_meta_model_stream_id(); + let init = unsigned_init_event(&stream_id).await; + let init_cid = *init.cid(); - let data_3 = data_event( - init_2_cid, - init_2_cid, + let data = data_event( + init_cid, + init_cid, ipld!({ "stream2" : "data_1" }), &signer, ) .await; - let (data_3_id, data_3_car) = ( - build_event_id(data_3.envelope_cid(), &init_2_cid, &stream_id_2), - data_3.encode_car().unwrap(), - ); - - // push the events in the order they should be inserted - events.push(ApiItem::new(event_id_1, car_1)); - events.push(ApiItem::new(data_1_id, data_1_car)); - events.push(ApiItem::new(data_2_id, data_2_car)); - events.push(ApiItem::new(event_id_2, car_2)); - events.push(ApiItem::new(data_3_id, data_3_car)); + let time = time_event( + init_cid, + *data.envelope_cid(), + "test:chain", + deterministic_cid(b"root cid"), + "test", + ) + .await; - events + vec![ + ( + build_event_id(&init_cid, &init_cid, &stream_id), + init.into(), + ), + ( + build_event_id(data.envelope_cid(), &init_cid, &stream_id), + data.into(), + ), + ( + build_event_id( + &time.to_cid().expect("time event should always encode"), + &init_cid, + &stream_id, + ), + time.into(), + ), + ] } /// Creates a deterministic StreamId of type Model based on the provided initial data. diff --git a/event-svc/src/tests/ordering.rs b/event-svc/src/tests/ordering.rs index c63cf853..6581dbec 100644 --- a/event-svc/src/tests/ordering.rs +++ b/event-svc/src/tests/ordering.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use ceramic_api::{ApiItem, EventDataResult, EventService as ApiEventService, IncludeEventData}; use ceramic_core::{EventId, NodeId}; @@ -75,7 +75,7 @@ async fn test_init_event_delivered() { ApiItem::new_arced(init.key.to_owned(), init.value.to_owned()), ) .await; - check_deliverable(&store.pool, &init.key.cid().unwrap(), true).await; + check_deliverable(store.event_access, &init.key.cid().unwrap(), true).await; } #[test(tokio::test)] @@ -104,14 +104,24 @@ async fn test_prev_exists_history_required() { ApiItem::new_arced(init.key.clone(), init.value.clone()), ) .await; - check_deliverable(&store.pool, &init.key.cid().unwrap(), true).await; + check_deliverable( + Arc::clone(&store.event_access), + &init.key.cid().unwrap(), + true, + ) + .await; add_and_assert_new_local_event( &store, ApiItem::new_arced(data.key.clone(), data.value.clone()), ) .await; - check_deliverable(&store.pool, &data.key.cid().unwrap(), true).await; + check_deliverable( + Arc::clone(&store.event_access), + &data.key.cid().unwrap(), + true, + ) + .await; let delivered = get_delivered_cids(&store).await; assert_eq!(2, delivered.len()); @@ -138,8 +148,18 @@ async fn test_prev_in_same_write_history_required() { .unwrap(); let new = new.iter().filter(|v| v.success()).count(); assert_eq!(2, new); - check_deliverable(&store.pool, &init.key.cid().unwrap(), true).await; - check_deliverable(&store.pool, &data.key.cid().unwrap(), true).await; + check_deliverable( + Arc::clone(&store.event_access), + &init.key.cid().unwrap(), + true, + ) + .await; + check_deliverable( + Arc::clone(&store.event_access), + &data.key.cid().unwrap(), + true, + ) + .await; let delivered = get_delivered_cids(&store).await; assert_eq!(2, delivered.len()); @@ -167,7 +187,12 @@ async fn test_missing_prev_pending_recon() { // now we add the init and we should see init, data 1 (first stored), data 2 (second stored) as highwater returns let data = &events[0]; add_and_assert_new_recon_event(&store, data.to_owned()).await; - check_deliverable(&store.pool, &data.key.cid().unwrap(), true).await; + check_deliverable( + Arc::clone(&store.event_access), + &data.key.cid().unwrap(), + true, + ) + .await; // This happens out of band, so give it a moment to make sure everything is updated tokio::time::sleep(std::time::Duration::from_secs(1)).await; @@ -191,7 +216,12 @@ async fn missing_prev_pending_recon_should_deliver_without_stream_update() { let data = &events[0]; add_and_assert_new_recon_event(&store, data.to_owned()).await; - check_deliverable(&store.pool, &data.key.cid().unwrap(), true).await; + check_deliverable( + Arc::clone(&store.event_access), + &data.key.cid().unwrap(), + true, + ) + .await; // now we add the second event, it should quickly become deliverable let data = &events[1]; @@ -223,17 +253,37 @@ async fn multiple_streams_missing_prev_recon_should_deliver_without_stream_updat // store the first event in both streams. // we could do insert as a list, but to make sure we have the ordering we expect at the end we do them one by one add_and_assert_new_recon_event(&store, s1_init.to_owned()).await; - check_deliverable(&store.pool, &s1_init.key.cid().unwrap(), true).await; + check_deliverable( + Arc::clone(&store.event_access), + &s1_init.key.cid().unwrap(), + true, + ) + .await; add_and_assert_new_recon_event(&store, s2_init.to_owned()).await; - check_deliverable(&store.pool, &s2_init.key.cid().unwrap(), true).await; + check_deliverable( + Arc::clone(&store.event_access), + &s2_init.key.cid().unwrap(), + true, + ) + .await; // now we add the third event for both and they should be stuck in pending add_and_assert_new_recon_event(&store, s1_3.to_owned()).await; - check_deliverable(&store.pool, &s1_3.key.cid().unwrap(), false).await; + check_deliverable( + Arc::clone(&store.event_access), + &s1_3.key.cid().unwrap(), + false, + ) + .await; add_and_assert_new_recon_event(&store, s2_3.to_owned()).await; - check_deliverable(&store.pool, &s2_3.key.cid().unwrap(), false).await; + check_deliverable( + Arc::clone(&store.event_access), + &s2_3.key.cid().unwrap(), + false, + ) + .await; tokio::time::sleep(std::time::Duration::from_millis(100)).await; let delivered = get_delivered_cids(&store).await; @@ -348,10 +398,7 @@ async fn recon_lots_of_streams() { } // first just make sure they were all inserted (not delivered yet) for (i, cid) in all_cids.iter().enumerate() { - let (exists, _delivered) = - crate::store::CeramicOneEvent::deliverable_by_cid(&store.pool, cid) - .await - .unwrap(); + let (exists, _delivered) = store.event_access.deliverable_by_cid(cid).await.unwrap(); assert!(exists, "idx: {}. missing cid: {}", i, cid); } @@ -380,7 +427,7 @@ async fn recon_lots_of_streams() { } // now we check that all the events are deliverable for cid in all_cids.iter() { - check_deliverable(&store.pool, cid, true).await; + check_deliverable(Arc::clone(&store.event_access), cid, true).await; } // and make sure the events were delivered for each stream streams in the same order as they were at the start for (i, stream) in expected_stream_order.iter().enumerate() { diff --git a/event/src/unvalidated/event.rs b/event/src/unvalidated/event.rs index 328e0cd7..4290a449 100644 --- a/event/src/unvalidated/event.rs +++ b/event/src/unvalidated/event.rs @@ -276,11 +276,22 @@ impl From> for Event { } } +impl From for Event { + fn from(value: TimeEvent) -> Self { + Self::Time(Box::new(value)) + } +} + impl From>> for Event { fn from(value: Box>) -> Self { Self::Unsigned(value) } } +impl From> for Event { + fn from(value: init::Event) -> Self { + Self::Unsigned(Box::new(value)) + } +} impl From> for Event { fn from(value: signed::Event) -> Self { @@ -357,6 +368,12 @@ impl TimeEvent { pub fn path(&self) -> &str { self.event.path.as_ref() } + /// Encode the event and report the CID + pub fn to_cid( + &self, + ) -> Result> { + self.event.to_cid() + } /// Encode the event into CAR bytes including all relevant blocks. pub fn encode_car(&self) -> anyhow::Result> { let (cid, event) = self.event.to_dag_cbor_block()?;