Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Flatten EventInsertable type #480

Merged
merged 1 commit into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions service/src/event/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl OrderEvents {
HashMap::from_iter(candidate_events.iter_mut().map(|(e, meta)| {
// all init events are deliverable so we mark them as such before we do anything else
if matches!(meta, EventMetadata::Init { .. }) {
e.body.set_deliverable(true);
e.set_deliverable(true);
}
(e.cid(), e.deliverable())
}));
Expand Down Expand Up @@ -73,7 +73,7 @@ impl OrderEvents {
Some(prev) => {
if let Some(in_mem_is_deliverable) = new_cids.get(&prev) {
if *in_mem_is_deliverable {
event.body.set_deliverable(true);
event.set_deliverable(true);
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
deliverable.push((event, header));
} else {
Expand All @@ -83,7 +83,7 @@ impl OrderEvents {
let (_exists, prev_deliverable) =
CeramicOneEvent::deliverable_by_cid(pool, &prev).await?;
if prev_deliverable {
event.body.set_deliverable(true);
event.set_deliverable(true);
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
deliverable.push((event, header));
} else {
Expand All @@ -110,7 +110,7 @@ impl OrderEvents {
Some(prev) => {
if new_cids.get(&prev).map_or(false, |v| *v) {
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
event.body.set_deliverable(true);
event.set_deliverable(true);
deliverable.push((event, header));
// reset the iteration count since we made changes. once it doesn't change for a loop through the queue we're done
iteration = 0;
Expand Down Expand Up @@ -171,10 +171,10 @@ mod test {
let mut after_2 = Vec::with_capacity(stream_2.len());
for (event, _) in events {
assert!(event.deliverable());
if stream_1.iter().any(|e| e.key == event.order_key) {
after_1.push(event.order_key.clone());
if stream_1.iter().any(|e| e.key == *event.order_key()) {
after_1.push(event.order_key().clone());
} else {
after_2.push(event.order_key.clone());
after_2.push(event.order_key().clone());
}
}

Expand Down Expand Up @@ -290,7 +290,7 @@ mod test {
.iter_mut()
.take(3)
.map(|(i, _)| {
i.body.set_deliverable(true);
i.set_deliverable(true);
i.clone()
})
.collect::<Vec<_>>();
Expand All @@ -301,7 +301,7 @@ mod test {

let expected = remaining
.iter()
.map(|(i, _)| i.order_key.clone())
.map(|(i, _)| i.order_key().clone())
.collect::<Vec<_>>();
remaining.shuffle(&mut thread_rng());

Expand All @@ -314,7 +314,7 @@ mod test {
let after = ordered
.deliverable
.iter()
.map(|(e, _)| e.order_key.clone())
.map(|(e, _)| e.order_key().clone())
.collect::<Vec<_>>();
assert_eq!(expected, after);
}
Expand Down
2 changes: 1 addition & 1 deletion service/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ mod test {
async fn insert_10_with_9_undelivered(pool: &SqlitePool) {
let insertable = get_n_insertable_events(10).await;
let mut init = insertable.first().unwrap().to_owned();
init.body.set_deliverable(true);
init.set_deliverable(true);
let undelivered = insertable.into_iter().skip(1).collect::<Vec<_>>();

let new = CeramicOneEvent::insert_many(pool, undelivered.iter())
Expand Down
12 changes: 5 additions & 7 deletions service/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashSet;
use async_trait::async_trait;
use ceramic_core::{EventId, Network};
use ceramic_event::unvalidated;
use ceramic_store::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool};
use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool};
use cid::Cid;
use futures::stream::BoxStream;
use ipld_core::ipld::Ipld;
Expand Down Expand Up @@ -134,13 +134,11 @@ impl CeramicEventService {
)));
}

let body = EventInsertableBody::try_from_carfile(cid, item.value.as_slice()).await?;
let event_insertable =
EventInsertable::try_from_carfile(item.key.to_owned(), item.value.as_slice()).await?;
let metadata = EventMetadata::from(parsed_event);

Ok((
EventInsertable::try_new(item.key.to_owned(), body)?,
metadata,
))
Ok((event_insertable, metadata))
}

pub(crate) async fn insert_events(
Expand Down Expand Up @@ -169,7 +167,7 @@ impl CeramicEventService {
let to_insert = ordered.deliverable().iter().map(|(e, _)| e);
invalid.extend(ordered.missing_history().iter().map(|(e, _)| {
InvalidItem::RequiresHistory {
key: e.order_key.clone(),
key: e.order_key().clone(),
}
}));
CeramicOneEvent::insert_many(&self.pool, to_insert).await?
Expand Down
2 changes: 1 addition & 1 deletion store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod sql;
pub use error::Error;
pub use metrics::{Metrics, StoreMetricsMiddleware};
pub use sql::{
entities::{BlockHash, EventBlockRaw, EventInsertable, EventInsertableBody},
entities::{BlockHash, EventBlockRaw, EventInsertable},
CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, CeramicOneVersion,
InsertResult, InsertedEvent, Migrations, SqlitePool, SqliteRootStore, SqliteTransaction,
};
Expand Down
6 changes: 3 additions & 3 deletions store/src/sql/access/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,14 @@ impl CeramicOneEvent {
let mut tx = pool.begin_tx().await.map_err(Error::from)?;

for item in to_add {
let new_key = Self::insert_event(&mut tx, &item.order_key, item.deliverable()).await?;
let new_key = Self::insert_event(&mut tx, item.order_key(), item.deliverable()).await?;
inserted.push(InsertedEvent::new(
item.order_key.clone(),
item.order_key().clone(),
new_key,
item.deliverable(),
));
if new_key {
for block in item.body.blocks().iter() {
for block in item.blocks().iter() {
CeramicOneBlock::insert(&mut tx, block.multihash.inner(), &block.bytes).await?;
CeramicOneEventBlock::insert(&mut tx, block).await?;
}
Expand Down
88 changes: 26 additions & 62 deletions store/src/sql/entities/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,46 +43,7 @@ pub async fn rebuild_car(blocks: Vec<BlockRow>) -> Result<Option<Vec<u8>>> {
/// The type we use to insert events into the database
pub struct EventInsertable {
/// The event order key (e.g. EventID)
pub order_key: EventId,
/// The data that makes up the event
pub body: EventInsertableBody,
}

impl EventInsertable {
/// Try to build the EventInsertable struct from a carfile.
pub async fn try_from_carfile(order_key: EventId, body: &[u8]) -> Result<Self> {
let cid = order_key.cid().ok_or_else(|| {
Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", order_key))
})?;
let body = EventInsertableBody::try_from_carfile(cid, body).await?;
Ok(Self { order_key, body })
}

/// Build the EventInsertable struct from an EventID and EventInsertableBody.
/// Will error if the CID in the EventID doesn't match the CID in the EventInsertableBody.
pub fn try_new(order_key: EventId, body: EventInsertableBody) -> Result<Self> {
if order_key.cid() != Some(body.cid()) {
return Err(Error::new_invalid_arg(anyhow!(
"EventID CID does not match the body CID"
)));
}
Ok(Self { order_key, body })
}

/// Get the CID of the event
pub fn cid(&self) -> Cid {
self.body.cid
}

/// Whether this event is deliverable currently
pub fn deliverable(&self) -> bool {
self.body.deliverable()
}
}

#[derive(Debug, Clone)]
/// The type we use to insert events into the database
pub struct EventInsertableBody {
order_key: EventId,
/// The event CID i.e. the root CID from the car file
cid: Cid,
/// Whether the event is deliverable i.e. it's prev has been delivered and the chain is continuous to an init event
Expand All @@ -92,22 +53,30 @@ pub struct EventInsertableBody {
blocks: Vec<EventBlockRaw>,
}

impl EventInsertableBody {
/// Create a new EventInsertRaw struct. Deliverable is set to false by default.
pub fn new(cid: Cid, blocks: Vec<EventBlockRaw>, deliverable: bool) -> Self {
impl EventInsertable {
/// EventInsertable constructor
pub fn new(order_key: EventId, blocks: Vec<EventBlockRaw>, deliverable: bool) -> Self {
let cid = order_key.cid().unwrap();

Self {
order_key,
cid,
deliverable,
blocks,
}
}

/// Get the Recon order key (EventId) of the event.
pub fn order_key(&self) -> &EventId {
&self.order_key
}

/// Get the CID of the event
pub fn cid(&self) -> Cid {
self.cid
}

/// Whether this event is deliverable currently
/// Underlying bytes that make up the event
pub fn blocks(&self) -> &Vec<EventBlockRaw> {
&self.blocks
}
Expand All @@ -123,30 +92,20 @@ impl EventInsertableBody {
self.deliverable = deliverable;
}

/// Find a block from the carfile for a given CID if it's included
pub fn block_for_cid_opt(&self, cid: &Cid) -> Option<&EventBlockRaw> {
self.blocks
.iter()
.find(|b| Cid::new_v1(b.codec.try_into().unwrap(), *b.multihash.inner()) == *cid)
}

/// Find a block from the carfile for a given CID if it's included
pub fn block_for_cid(&self, cid: &Cid) -> Result<&EventBlockRaw> {
self.block_for_cid_opt(cid)
.ok_or_else(|| Error::new_app(anyhow!("Event data is missing data for CID {}", cid)))
}
/// Try to build the EventInsertable struct from a carfile.
pub async fn try_from_carfile(order_key: EventId, car_bytes: &[u8]) -> Result<Self> {
let event_cid = order_key.cid().ok_or_else(|| {
Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", order_key))
})?;

/// Builds a new EventInsertRaw from a CAR file. Will error if the CID in the EventID doesn't match the
/// first root of the carfile.
pub async fn try_from_carfile(event_cid: Cid, val: &[u8]) -> Result<Self> {
if val.is_empty() {
if car_bytes.is_empty() {
return Err(Error::new_app(anyhow!(
"CAR file is empty: cid={}",
event_cid
)))?;
}

let mut reader = CarReader::new(val)
let mut reader = CarReader::new(car_bytes)
.await
.map_err(|e| Error::new_app(anyhow!(e)))?;
let root_cid = reader
Expand All @@ -169,6 +128,11 @@ impl EventInsertableBody {
blocks.push(ebr);
idx += 1;
}
Ok(Self::new(event_cid, blocks, false))
Ok(Self {
order_key,
cid: event_cid,
blocks,
deliverable: false,
})
}
}
2 changes: 1 addition & 1 deletion store/src/sql/entities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod utils;
mod version;

pub use block::{BlockBytes, BlockRow};
pub use event::{rebuild_car, EventInsertable, EventInsertableBody};
pub use event::{rebuild_car, EventInsertable};
pub use event_block::{EventBlockRaw, ReconEventBlockRaw};
pub use hash::{BlockHash, ReconHash};
pub use version::VersionRow;
Expand Down
8 changes: 2 additions & 6 deletions store/src/sql/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use cid::Cid;
use expect_test::expect;
use test_log::test;

use crate::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool};
use crate::{CeramicOneEvent, EventInsertable, SqlitePool};

const MODEL_ID: &str = "k2t6wz4yhfp1r5pwi52gw89nzjbu53qk7m32o5iguw42c6knsaj0feuf927agb";
const CONTROLLER: &str = "did:key:z6Mkqtw7Pj5Lv9xc4PgUYAnwfaVoMC6FRneGWVr5ekTEfKVL";
Expand All @@ -28,11 +28,7 @@ fn random_event(cid: &str) -> EventInsertable {
let order_key = event_id_builder()
.with_event(&Cid::from_str(cid).unwrap())
.build();
let cid = order_key.cid().unwrap();
EventInsertable {
order_key,
body: EventInsertableBody::new(cid, vec![], true),
}
EventInsertable::new(order_key, vec![], true)
}

#[test(tokio::test)]
Expand Down
Loading