diff --git a/service/src/event/order_events.rs b/service/src/event/order_events.rs index 480b75442..84a107c0e 100644 --- a/service/src/event/order_events.rs +++ b/service/src/event/order_events.rs @@ -42,7 +42,7 @@ impl OrderEvents { HashMap::from_iter(candidate_events.iter_mut().map(|(e, meta)| { // all init events are deliverable so we mark them as such before we do anything else if matches!(meta, EventMetadata::Init { .. }) { - e.body.set_deliverable(true); + e.set_deliverable(true); } (e.cid(), e.deliverable()) })); @@ -73,7 +73,7 @@ impl OrderEvents { Some(prev) => { if let Some(in_mem_is_deliverable) = new_cids.get(&prev) { if *in_mem_is_deliverable { - event.body.set_deliverable(true); + event.set_deliverable(true); *new_cids.get_mut(&event.cid()).expect("CID must exist") = true; deliverable.push((event, header)); } else { @@ -83,7 +83,7 @@ impl OrderEvents { let (_exists, prev_deliverable) = CeramicOneEvent::deliverable_by_cid(pool, &prev).await?; if prev_deliverable { - event.body.set_deliverable(true); + event.set_deliverable(true); *new_cids.get_mut(&event.cid()).expect("CID must exist") = true; deliverable.push((event, header)); } else { @@ -110,7 +110,7 @@ impl OrderEvents { Some(prev) => { if new_cids.get(&prev).map_or(false, |v| *v) { *new_cids.get_mut(&event.cid()).expect("CID must exist") = true; - event.body.set_deliverable(true); + event.set_deliverable(true); deliverable.push((event, header)); // reset the iteration count since we made changes. once it doesn't change for a loop through the queue we're done iteration = 0; @@ -171,10 +171,10 @@ mod test { let mut after_2 = Vec::with_capacity(stream_2.len()); for (event, _) in events { assert!(event.deliverable()); - if stream_1.iter().any(|e| e.key == event.order_key) { - after_1.push(event.order_key.clone()); + if stream_1.iter().any(|e| e.key == *event.order_key()) { + after_1.push(event.order_key().clone()); } else { - after_2.push(event.order_key.clone()); + after_2.push(event.order_key().clone()); } } @@ -290,7 +290,7 @@ mod test { .iter_mut() .take(3) .map(|(i, _)| { - i.body.set_deliverable(true); + i.set_deliverable(true); i.clone() }) .collect::>(); @@ -301,7 +301,7 @@ mod test { let expected = remaining .iter() - .map(|(i, _)| i.order_key.clone()) + .map(|(i, _)| i.order_key().clone()) .collect::>(); remaining.shuffle(&mut thread_rng()); @@ -314,7 +314,7 @@ mod test { let after = ordered .deliverable .iter() - .map(|(e, _)| e.order_key.clone()) + .map(|(e, _)| e.order_key().clone()) .collect::>(); assert_eq!(expected, after); } diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index 38093f20e..fcd40e708 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -630,7 +630,7 @@ mod test { async fn insert_10_with_9_undelivered(pool: &SqlitePool) { let insertable = get_n_insertable_events(10).await; let mut init = insertable.first().unwrap().to_owned(); - init.body.set_deliverable(true); + init.set_deliverable(true); let undelivered = insertable.into_iter().skip(1).collect::>(); let new = CeramicOneEvent::insert_many(pool, undelivered.iter()) diff --git a/service/src/event/service.rs b/service/src/event/service.rs index d2b791143..7a2b59b72 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -3,7 +3,7 @@ use std::collections::HashSet; use async_trait::async_trait; use ceramic_core::{EventId, Network}; use ceramic_event::unvalidated; -use ceramic_store::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool}; +use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool}; use cid::Cid; use futures::stream::BoxStream; use ipld_core::ipld::Ipld; @@ -134,13 +134,11 @@ impl CeramicEventService { ))); } - let body = EventInsertableBody::try_from_carfile(cid, item.value.as_slice()).await?; + let event_insertable = + EventInsertable::try_from_carfile(item.key.to_owned(), item.value.as_slice()).await?; let metadata = EventMetadata::from(parsed_event); - Ok(( - EventInsertable::try_new(item.key.to_owned(), body)?, - metadata, - )) + Ok((event_insertable, metadata)) } pub(crate) async fn insert_events( @@ -169,7 +167,7 @@ impl CeramicEventService { let to_insert = ordered.deliverable().iter().map(|(e, _)| e); invalid.extend(ordered.missing_history().iter().map(|(e, _)| { InvalidItem::RequiresHistory { - key: e.order_key.clone(), + key: e.order_key().clone(), } })); CeramicOneEvent::insert_many(&self.pool, to_insert).await? diff --git a/store/src/lib.rs b/store/src/lib.rs index 76c013392..93536b8bb 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -9,7 +9,7 @@ mod sql; pub use error::Error; pub use metrics::{Metrics, StoreMetricsMiddleware}; pub use sql::{ - entities::{BlockHash, EventBlockRaw, EventInsertable, EventInsertableBody}, + entities::{BlockHash, EventBlockRaw, EventInsertable}, CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, CeramicOneVersion, InsertResult, InsertedEvent, Migrations, SqlitePool, SqliteRootStore, SqliteTransaction, }; diff --git a/store/src/sql/access/event.rs b/store/src/sql/access/event.rs index 692e92610..bb344b76a 100644 --- a/store/src/sql/access/event.rs +++ b/store/src/sql/access/event.rs @@ -169,14 +169,14 @@ impl CeramicOneEvent { let mut tx = pool.begin_tx().await.map_err(Error::from)?; for item in to_add { - let new_key = Self::insert_event(&mut tx, &item.order_key, item.deliverable()).await?; + let new_key = Self::insert_event(&mut tx, item.order_key(), item.deliverable()).await?; inserted.push(InsertedEvent::new( - item.order_key.clone(), + item.order_key().clone(), new_key, item.deliverable(), )); if new_key { - for block in item.body.blocks().iter() { + for block in item.blocks().iter() { CeramicOneBlock::insert(&mut tx, block.multihash.inner(), &block.bytes).await?; CeramicOneEventBlock::insert(&mut tx, block).await?; } diff --git a/store/src/sql/entities/event.rs b/store/src/sql/entities/event.rs index 238d6c7df..d65d130eb 100644 --- a/store/src/sql/entities/event.rs +++ b/store/src/sql/entities/event.rs @@ -43,46 +43,7 @@ pub async fn rebuild_car(blocks: Vec) -> Result>> { /// The type we use to insert events into the database pub struct EventInsertable { /// The event order key (e.g. EventID) - pub order_key: EventId, - /// The data that makes up the event - pub body: EventInsertableBody, -} - -impl EventInsertable { - /// Try to build the EventInsertable struct from a carfile. - pub async fn try_from_carfile(order_key: EventId, body: &[u8]) -> Result { - let cid = order_key.cid().ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", order_key)) - })?; - let body = EventInsertableBody::try_from_carfile(cid, body).await?; - Ok(Self { order_key, body }) - } - - /// Build the EventInsertable struct from an EventID and EventInsertableBody. - /// Will error if the CID in the EventID doesn't match the CID in the EventInsertableBody. - pub fn try_new(order_key: EventId, body: EventInsertableBody) -> Result { - if order_key.cid() != Some(body.cid()) { - return Err(Error::new_invalid_arg(anyhow!( - "EventID CID does not match the body CID" - ))); - } - Ok(Self { order_key, body }) - } - - /// Get the CID of the event - pub fn cid(&self) -> Cid { - self.body.cid - } - - /// Whether this event is deliverable currently - pub fn deliverable(&self) -> bool { - self.body.deliverable() - } -} - -#[derive(Debug, Clone)] -/// The type we use to insert events into the database -pub struct EventInsertableBody { + order_key: EventId, /// The event CID i.e. the root CID from the car file cid: Cid, /// Whether the event is deliverable i.e. it's prev has been delivered and the chain is continuous to an init event @@ -92,22 +53,30 @@ pub struct EventInsertableBody { blocks: Vec, } -impl EventInsertableBody { - /// Create a new EventInsertRaw struct. Deliverable is set to false by default. - pub fn new(cid: Cid, blocks: Vec, deliverable: bool) -> Self { +impl EventInsertable { + /// EventInsertable constructor + pub fn new(order_key: EventId, blocks: Vec, deliverable: bool) -> Self { + let cid = order_key.cid().unwrap(); + Self { + order_key, cid, deliverable, blocks, } } + /// Get the Recon order key (EventId) of the event. + pub fn order_key(&self) -> &EventId { + &self.order_key + } + /// Get the CID of the event pub fn cid(&self) -> Cid { self.cid } - /// Whether this event is deliverable currently + /// Underlying bytes that make up the event pub fn blocks(&self) -> &Vec { &self.blocks } @@ -123,30 +92,20 @@ impl EventInsertableBody { self.deliverable = deliverable; } - /// Find a block from the carfile for a given CID if it's included - pub fn block_for_cid_opt(&self, cid: &Cid) -> Option<&EventBlockRaw> { - self.blocks - .iter() - .find(|b| Cid::new_v1(b.codec.try_into().unwrap(), *b.multihash.inner()) == *cid) - } - - /// Find a block from the carfile for a given CID if it's included - pub fn block_for_cid(&self, cid: &Cid) -> Result<&EventBlockRaw> { - self.block_for_cid_opt(cid) - .ok_or_else(|| Error::new_app(anyhow!("Event data is missing data for CID {}", cid))) - } + /// Try to build the EventInsertable struct from a carfile. + pub async fn try_from_carfile(order_key: EventId, car_bytes: &[u8]) -> Result { + let event_cid = order_key.cid().ok_or_else(|| { + Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", order_key)) + })?; - /// Builds a new EventInsertRaw from a CAR file. Will error if the CID in the EventID doesn't match the - /// first root of the carfile. - pub async fn try_from_carfile(event_cid: Cid, val: &[u8]) -> Result { - if val.is_empty() { + if car_bytes.is_empty() { return Err(Error::new_app(anyhow!( "CAR file is empty: cid={}", event_cid )))?; } - let mut reader = CarReader::new(val) + let mut reader = CarReader::new(car_bytes) .await .map_err(|e| Error::new_app(anyhow!(e)))?; let root_cid = reader @@ -169,6 +128,11 @@ impl EventInsertableBody { blocks.push(ebr); idx += 1; } - Ok(Self::new(event_cid, blocks, false)) + Ok(Self { + order_key, + cid: event_cid, + blocks, + deliverable: false, + }) } } diff --git a/store/src/sql/entities/mod.rs b/store/src/sql/entities/mod.rs index 0be8fd9b7..acfa42554 100644 --- a/store/src/sql/entities/mod.rs +++ b/store/src/sql/entities/mod.rs @@ -6,7 +6,7 @@ mod utils; mod version; pub use block::{BlockBytes, BlockRow}; -pub use event::{rebuild_car, EventInsertable, EventInsertableBody}; +pub use event::{rebuild_car, EventInsertable}; pub use event_block::{EventBlockRaw, ReconEventBlockRaw}; pub use hash::{BlockHash, ReconHash}; pub use version::VersionRow; diff --git a/store/src/sql/test.rs b/store/src/sql/test.rs index ad905f7d7..116670662 100644 --- a/store/src/sql/test.rs +++ b/store/src/sql/test.rs @@ -8,7 +8,7 @@ use cid::Cid; use expect_test::expect; use test_log::test; -use crate::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool}; +use crate::{CeramicOneEvent, EventInsertable, SqlitePool}; const MODEL_ID: &str = "k2t6wz4yhfp1r5pwi52gw89nzjbu53qk7m32o5iguw42c6knsaj0feuf927agb"; const CONTROLLER: &str = "did:key:z6Mkqtw7Pj5Lv9xc4PgUYAnwfaVoMC6FRneGWVr5ekTEfKVL"; @@ -28,11 +28,7 @@ fn random_event(cid: &str) -> EventInsertable { let order_key = event_id_builder() .with_event(&Cid::from_str(cid).unwrap()) .build(); - let cid = order_key.cid().unwrap(); - EventInsertable { - order_key, - body: EventInsertableBody::new(cid, vec![], true), - } + EventInsertable::new(order_key, vec![], true) } #[test(tokio::test)]