Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Change EventInsertable to store the parsed Event object instead of its raw blocks #483

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 63 additions & 38 deletions event/src/unvalidated/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,44 @@ use std::{collections::HashMap, fmt::Debug};
use tokio::io::AsyncRead;
use tracing::debug;

use super::{cid_from_dag_cbor, init, signed};
use super::{cid_from_dag_cbor, init, signed, Payload};

/// Helper function for Event::decode_car for gathering all the Ipld blocks used by a time event
/// witness proof.
fn get_time_event_witness_blocks(
event: &RawTimeEvent,
proof: &Proof,
car_blocks: HashMap<Cid, Vec<u8>>,
) -> anyhow::Result<Vec<Ipld>> {
let mut blocks_in_path = Vec::new();
if event.prev == proof.root && event.path.is_empty() {
return Ok(blocks_in_path);
}

let block_bytes = car_blocks
.get(&proof.root())
.ok_or_else(|| anyhow!("Time Event CAR data missing block for root",))?;
let mut block: Ipld = serde_ipld_dagcbor::from_slice(block_bytes)?;
blocks_in_path.push(block.clone());
let parts: Vec<_> = event.path().split('/').collect();
// Add blocks for all parts but the last as it is the prev.
for index in parts.iter().take(parts.len() - 1) {
let cid = block
.get(*index)?
.ok_or_else(|| anyhow!("Time Event path indexes missing data"))?;
let cid = match cid {
Ipld::Link(cid) => cid,
_ => bail!("Time Event path does not index to a CID"),
};
let block_bytes = car_blocks
.get(cid)
.ok_or_else(|| anyhow!("Time Event CAR data missing block for path index"))?;
blocks_in_path.push(block);
block = serde_ipld_dagcbor::from_slice(block_bytes)?;
}

Ok(blocks_in_path)
}

/// Materialized Ceramic Event where internal structure is accessible.
#[derive(Debug)]
Expand All @@ -28,48 +65,37 @@ impl<D> Event<D>
where
D: serde::Serialize + for<'de> serde::Deserialize<'de>,
{
/// Encode the event into a CAR bytes containing all blocks of the event.
pub async fn encode_car(&self) -> anyhow::Result<Vec<u8>> {
/// Returns true if this Event is an init event, and false otherwise
pub fn is_init(&self) -> bool {
match self {
Event::Time(event) => event.encode_car().await,
Event::Signed(event) => event.encode_car().await,
Event::Unsigned(event) => event.encode_car().await,
Event::Time(_) => false,
Event::Signed(event) => match event.payload() {
Payload::Data(_) => false,
Payload::Init(_) => true,
},
Event::Unsigned(_) => true,
}
}

fn get_time_event_witness_blocks(
event: &RawTimeEvent,
proof: &Proof,
car_blocks: HashMap<Cid, Vec<u8>>,
) -> anyhow::Result<Vec<Ipld>> {
let mut blocks_in_path = Vec::new();
if event.prev == proof.root && event.path.is_empty() {
return Ok(blocks_in_path);
/// Returns the prev CID (or None if the event is an init event)
pub fn prev(&self) -> Option<Cid> {
match self {
Event::Time(t) => Some(t.prev()),
Event::Signed(event) => match event.payload() {
Payload::Data(d) => Some(*d.prev()),
Payload::Init(_) => None,
},
Event::Unsigned(_) => None,
}
}

let block_bytes = car_blocks
.get(&proof.root())
.ok_or_else(|| anyhow!("Time Event CAR data missing block for root",))?;
let mut block: Ipld = serde_ipld_dagcbor::from_slice(block_bytes)?;
blocks_in_path.push(block.clone());
let parts: Vec<_> = event.path().split('/').collect();
// Add blocks for all parts but the last as it is the prev.
for index in parts.iter().take(parts.len() - 1) {
let cid = block
.get(*index)?
.ok_or_else(|| anyhow!("Time Event path indexes missing data"))?;
let cid = match cid {
Ipld::Link(cid) => cid,
_ => bail!("Time Event path does not index to a CID"),
};
let block_bytes = car_blocks
.get(cid)
.ok_or_else(|| anyhow!("Time Event CAR data missing block for path index"))?;
blocks_in_path.push(block);
block = serde_ipld_dagcbor::from_slice(block_bytes)?;
/// Encode the event into a CAR bytes containing all blocks of the event.
pub async fn encode_car(&self) -> anyhow::Result<Vec<u8>> {
match self {
Event::Time(event) => event.encode_car().await,
Event::Signed(event) => event.encode_car().await,
Event::Unsigned(event) => event.encode_car().await,
}

Ok(blocks_in_path)
}

/// Decode bytes into a materialized event.
Expand Down Expand Up @@ -117,8 +143,7 @@ where
.ok_or_else(|| anyhow!("Time Event CAR data missing block for proof"))?;
let proof: Proof =
serde_ipld_dagcbor::from_slice(proof_bytes).context("decoding proof")?;
let blocks_in_path =
Self::get_time_event_witness_blocks(&event, &proof, car_blocks)?;
let blocks_in_path = get_time_event_witness_blocks(&event, &proof, car_blocks)?;
let blocks_in_path = blocks_in_path
.into_iter()
.map(|block| match block {
Expand Down
118 changes: 58 additions & 60 deletions service/src/event/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@ use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool};

use crate::Result;

use super::service::EventMetadata;

pub(crate) struct OrderEvents {
deliverable: Vec<(EventInsertable, EventMetadata)>,
missing_history: Vec<(EventInsertable, EventMetadata)>,
deliverable: Vec<EventInsertable>,
missing_history: Vec<EventInsertable>,
}

impl OrderEvents {
pub fn deliverable(&self) -> &[(EventInsertable, EventMetadata)] {
pub fn deliverable(&self) -> &[EventInsertable] {
&self.deliverable
}

pub fn missing_history(&self) -> &[(EventInsertable, EventMetadata)] {
pub fn missing_history(&self) -> &[EventInsertable] {
&self.missing_history
}
}
Expand All @@ -36,58 +34,60 @@ impl OrderEvents {
/// *could* mark B deliverable and then C and D, but we DO NOT want to do this here to prevent API users from writing events that they haven't seen.
pub async fn try_new(
pool: &SqlitePool,
mut candidate_events: Vec<(EventInsertable, EventMetadata)>,
mut candidate_events: Vec<EventInsertable>,
) -> Result<Self> {
let mut new_cids: HashMap<Cid, bool> =
HashMap::from_iter(candidate_events.iter_mut().map(|(e, meta)| {
HashMap::from_iter(candidate_events.iter_mut().map(|e| {
// all init events are deliverable so we mark them as such before we do anything else
if matches!(meta, EventMetadata::Init { .. }) {
e.body.set_deliverable(true);
if e.event().is_init() {
e.set_deliverable(true);
}
(e.cid(), e.deliverable())
}));
let mut deliverable = Vec::with_capacity(candidate_events.len());
candidate_events.retain(|(e, h)| {
let mut remaining_candidates = Vec::with_capacity(candidate_events.len());

for e in candidate_events {
if e.deliverable() {
deliverable.push((e.clone(), h.clone()));
false
deliverable.push(e)
} else {
true
remaining_candidates.push(e)
}
});
if candidate_events.is_empty() {
}

if remaining_candidates.is_empty() {
return Ok(OrderEvents {
deliverable,
missing_history: Vec::new(),
});
}

let mut undelivered_prevs_in_memory = VecDeque::with_capacity(candidate_events.len());
let mut missing_history = Vec::with_capacity(candidate_events.len());
let mut undelivered_prevs_in_memory = VecDeque::with_capacity(remaining_candidates.len());
let mut missing_history = Vec::with_capacity(remaining_candidates.len());

while let Some((mut event, header)) = candidate_events.pop() {
match header.prev() {
while let Some(mut event) = remaining_candidates.pop() {
match event.event().prev() {
None => {
unreachable!("Init events should have been filtered out since they're always deliverable");
}
Some(prev) => {
if let Some(in_mem_is_deliverable) = new_cids.get(&prev) {
if *in_mem_is_deliverable {
event.body.set_deliverable(true);
event.set_deliverable(true);
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
deliverable.push((event, header));
deliverable.push(event);
} else {
undelivered_prevs_in_memory.push_back((event, header));
undelivered_prevs_in_memory.push_back(event);
}
} else {
let (_exists, prev_deliverable) =
CeramicOneEvent::deliverable_by_cid(pool, &prev).await?;
if prev_deliverable {
event.body.set_deliverable(true);
event.set_deliverable(true);
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
deliverable.push((event, header));
deliverable.push(event);
} else {
missing_history.push((event, header));
missing_history.push(event);
}
}
}
Expand All @@ -101,21 +101,21 @@ impl OrderEvents {
// We can't quite get rid of this loop because we may have discovered our prev's prev from the database in the previous pass.
let max_iterations = undelivered_prevs_in_memory.len();
let mut iteration = 0;
while let Some((mut event, header)) = undelivered_prevs_in_memory.pop_front() {
while let Some(mut event) = undelivered_prevs_in_memory.pop_front() {
iteration += 1;
match header.prev() {
match event.event().prev() {
None => {
unreachable!("Init events should have been filtered out of the in memory set");
}
Some(prev) => {
if new_cids.get(&prev).map_or(false, |v| *v) {
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
event.body.set_deliverable(true);
deliverable.push((event, header));
event.set_deliverable(true);
deliverable.push(event);
// reset the iteration count since we made changes. once it doesn't change for a loop through the queue we're done
iteration = 0;
} else {
undelivered_prevs_in_memory.push_back((event, header));
undelivered_prevs_in_memory.push_back(event);
}
}
}
Expand Down Expand Up @@ -147,7 +147,7 @@ mod test {
async fn get_2_streams() -> (
Vec<ReconItem<EventId>>,
Vec<ReconItem<EventId>>,
Vec<(EventInsertable, EventMetadata)>,
Vec<EventInsertable>,
) {
let stream_2 = get_n_events(10).await;
let stream_1 = get_n_events(10).await;
Expand All @@ -165,34 +165,43 @@ mod test {
fn split_deliverable_order_by_stream(
stream_1: &[ReconItem<EventId>],
stream_2: &[ReconItem<EventId>],
events: &[(EventInsertable, EventMetadata)],
events: &[EventInsertable],
) -> (Vec<EventId>, Vec<EventId>) {
let mut after_1 = Vec::with_capacity(stream_1.len());
let mut after_2 = Vec::with_capacity(stream_2.len());
for (event, _) in events {
for event in events {
assert!(event.deliverable());
if stream_1.iter().any(|e| e.key == event.order_key) {
after_1.push(event.order_key.clone());
if stream_1.iter().any(|e| e.key == *event.order_key()) {
after_1.push(event.order_key().clone());
} else {
after_2.push(event.order_key.clone());
after_2.push(event.order_key().clone());
}
}

(after_1, after_2)
}

/// Takes the given events from Recon and turns them into two vectors of insertable events.
async fn get_insertable_events(
events: &[ReconItem<EventId>],
) -> Vec<(EventInsertable, EventMetadata)> {
let mut insertable = Vec::with_capacity(events.len());
first_vec_count: usize,
) -> (Vec<EventInsertable>, Vec<EventInsertable>) {
let mut insertable = Vec::with_capacity(first_vec_count);
let mut remaining = Vec::with_capacity(events.len() - first_vec_count);
let mut i = 0;
for event in events {
let new = CeramicEventService::parse_discovered_event(event)
.await
.unwrap();
insertable.push(new);
if i < first_vec_count {
insertable.push(new);
} else {
remaining.push(new)
}
i += 1
}

insertable
(insertable, remaining)
}

#[test(tokio::test)]
Expand Down Expand Up @@ -256,13 +265,7 @@ mod test {
let pool = SqlitePool::connect_in_memory().await.unwrap();

let stream_1 = get_n_events(10).await;
let insertable = get_insertable_events(&stream_1).await;
let to_insert = insertable
.iter()
.take(3)
.map(|(i, _)| i.clone())
.collect::<Vec<_>>();
let mut remaining = insertable.into_iter().skip(3).collect::<Vec<_>>();
let (to_insert, mut remaining) = get_insertable_events(&stream_1, 3).await;
CeramicOneEvent::insert_many(&pool, to_insert.iter())
.await
.unwrap();
Expand All @@ -285,23 +288,18 @@ mod test {
let pool = SqlitePool::connect_in_memory().await.unwrap();

let stream_1 = get_n_events(10).await;
let mut insertable = get_insertable_events(&stream_1).await;
let to_insert = insertable
.iter_mut()
.take(3)
.map(|(i, _)| {
i.body.set_deliverable(true);
i.clone()
})
.collect::<Vec<_>>();
let mut remaining = insertable.into_iter().skip(3).collect::<Vec<_>>();
let (mut to_insert, mut remaining) = get_insertable_events(&stream_1, 3).await;
for item in to_insert.as_mut_slice() {
item.set_deliverable(true)
}

CeramicOneEvent::insert_many(&pool, to_insert.iter())
.await
.unwrap();

let expected = remaining
.iter()
.map(|(i, _)| i.order_key.clone())
.map(|i| i.order_key().clone())
.collect::<Vec<_>>();
remaining.shuffle(&mut thread_rng());

Expand All @@ -314,7 +312,7 @@ mod test {
let after = ordered
.deliverable
.iter()
.map(|(e, _)| e.order_key.clone())
.map(|e| e.order_key().clone())
.collect::<Vec<_>>();
assert_eq!(expected, after);
}
Expand Down
Loading
Loading