diff --git a/service/src/event/order_events.rs b/service/src/event/order_events.rs index 84a107c0e..58971b3de 100644 --- a/service/src/event/order_events.rs +++ b/service/src/event/order_events.rs @@ -47,25 +47,27 @@ impl OrderEvents { (e.cid(), e.deliverable()) })); let mut deliverable = Vec::with_capacity(candidate_events.len()); - candidate_events.retain(|(e, h)| { + let mut remaining_candidates = Vec::with_capacity(candidate_events.len()); + + for (e, h) in candidate_events { if e.deliverable() { - deliverable.push((e.clone(), h.clone())); - false + deliverable.push((e, h)) } else { - true + remaining_candidates.push((e, h)) } - }); - if candidate_events.is_empty() { + } + + if remaining_candidates.is_empty() { return Ok(OrderEvents { deliverable, - missing_history: Vec::new(), + missing_history: remaining_candidates, }); } - let mut undelivered_prevs_in_memory = VecDeque::with_capacity(candidate_events.len()); - let mut missing_history = Vec::with_capacity(candidate_events.len()); + let mut undelivered_prevs_in_memory = VecDeque::with_capacity(remaining_candidates.len()); + let mut missing_history = Vec::with_capacity(remaining_candidates.len()); - while let Some((mut event, header)) = candidate_events.pop() { + while let Some((mut event, header)) = remaining_candidates.pop() { match header.prev() { None => { unreachable!("Init events should have been filtered out since they're always deliverable"); @@ -181,18 +183,30 @@ mod test { (after_1, after_2) } + /// Takes the given events from Recon and turns them into two vectors of insertable events. async fn get_insertable_events( events: &[ReconItem], - ) -> Vec<(EventInsertable, EventMetadata)> { - let mut insertable = Vec::with_capacity(events.len()); + first_vec_count: usize, + ) -> ( + Vec<(EventInsertable, EventMetadata)>, + Vec<(EventInsertable, EventMetadata)>, + ) { + let mut insertable = Vec::with_capacity(first_vec_count); + let mut remaining = Vec::with_capacity(events.len() - first_vec_count); + let mut i = 0; for event in events { let new = CeramicEventService::parse_discovered_event(event) .await .unwrap(); - insertable.push(new); + if i < first_vec_count { + insertable.push(new); + } else { + remaining.push(new) + } + i += 1 } - insertable + (insertable, remaining) } #[test(tokio::test)] @@ -256,14 +270,8 @@ mod test { let pool = SqlitePool::connect_in_memory().await.unwrap(); let stream_1 = get_n_events(10).await; - let insertable = get_insertable_events(&stream_1).await; - let to_insert = insertable - .iter() - .take(3) - .map(|(i, _)| i.clone()) - .collect::>(); - let mut remaining = insertable.into_iter().skip(3).collect::>(); - CeramicOneEvent::insert_many(&pool, to_insert.iter()) + let (to_insert, mut remaining) = get_insertable_events(&stream_1, 3).await; + CeramicOneEvent::insert_many(&pool, to_insert.iter().map(|(i, _)| i)) .await .unwrap(); @@ -285,17 +293,12 @@ mod test { let pool = SqlitePool::connect_in_memory().await.unwrap(); let stream_1 = get_n_events(10).await; - let mut insertable = get_insertable_events(&stream_1).await; - let to_insert = insertable - .iter_mut() - .take(3) - .map(|(i, _)| { - i.set_deliverable(true); - i.clone() - }) - .collect::>(); - let mut remaining = insertable.into_iter().skip(3).collect::>(); - CeramicOneEvent::insert_many(&pool, to_insert.iter()) + let (mut to_insert, mut remaining) = get_insertable_events(&stream_1, 3).await; + for item in to_insert.as_mut_slice() { + item.0.set_deliverable(true) + } + + CeramicOneEvent::insert_many(&pool, to_insert.iter().map(|(ei, _)| ei)) .await .unwrap(); diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index fcd40e708..070bf04b9 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -628,12 +628,11 @@ mod test { } async fn insert_10_with_9_undelivered(pool: &SqlitePool) { - let insertable = get_n_insertable_events(10).await; - let mut init = insertable.first().unwrap().to_owned(); + let mut insertable = get_n_insertable_events(10).await; + let mut init = insertable.remove(0); init.set_deliverable(true); - let undelivered = insertable.into_iter().skip(1).collect::>(); - let new = CeramicOneEvent::insert_many(pool, undelivered.iter()) + let new = CeramicOneEvent::insert_many(pool, insertable.iter()) .await .unwrap(); diff --git a/store/src/sql/entities/event.rs b/store/src/sql/entities/event.rs index d65d130eb..7e3ea39af 100644 --- a/store/src/sql/entities/event.rs +++ b/store/src/sql/entities/event.rs @@ -39,7 +39,7 @@ pub async fn rebuild_car(blocks: Vec) -> Result>> { Ok(Some(car)) } -#[derive(Debug, Clone)] +#[derive(Debug)] /// The type we use to insert events into the database pub struct EventInsertable { /// The event order key (e.g. EventID)