Skip to content

Commit

Permalink
feat: store returns information about event validation
Browse files Browse the repository at this point in the history
- recon pending tracking simplified to just retry every batch since we should get items close together
- fixes a bug where any event in an api batch that was invalid could have crashed the entire batch (now only that one gets an error)
- TODO: still need to modify the service to have a better validation hook that keeps track of the data we need to return so we don't have to iterate back through or lose info along the way
  • Loading branch information
dav1do committed Aug 13, 2024
1 parent 82e59d7 commit cd34094
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 334 deletions.
6 changes: 3 additions & 3 deletions recon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ pub use crate::{
error::Error,
metrics::Metrics,
recon::{
btreestore::BTreeStore, pending_cache::PendingCache, AssociativeHash, EventIdStore,
FullInterests, HashCount, InsertResult, InterestProvider, InterestStore, InvalidItem, Key,
PendingItem, RangeHash, Recon, ReconInterestProvider, ReconItem, Split, Store, SyncState,
btreestore::BTreeStore, AssociativeHash, EventIdStore, FullInterests, HashCount,
InsertResult, InterestProvider, InterestStore, InvalidItem, Key, RangeHash, Recon,
ReconInterestProvider, ReconItem, Split, Store, SyncState,
},
sha256a::Sha256a,
};
Expand Down
55 changes: 34 additions & 21 deletions recon/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
InvalidEvents, MessageLabels, MessageRecv, MessageSent, Metrics, PendingEvents,
ProtocolRun, ProtocolWriteLoop,
},
recon::{pending_cache::PendingCache, RangeHash, SyncState},
recon::{RangeHash, SyncState},
AssociativeHash, Client, InsertResult, Key, ReconItem, Result as ReconResult,
};

Expand Down Expand Up @@ -79,7 +79,7 @@ impl ProtocolConfig {
pub fn new(insert_batch_size: usize, max_pending_items: usize, peer_id: PeerId) -> Self {
Self {
insert_batch_size,
max_pending_items,
max_pending_items: max_pending_items.max(10), // don't allow less than 10 for now
peer_id,
}
}
Expand Down Expand Up @@ -729,32 +729,27 @@ struct Common<R: Recon> {
recon: R,
event_q: Vec<ReconItem<R::Key>>,
config: ProtocolConfig,
pending: PendingCache<R::Key>,
pending_q: Vec<ReconItem<R::Key>>,
}

impl<R> Common<R>
where
R: Recon,
{
fn new(recon: R, config: ProtocolConfig) -> Self {
// allow at least 10 events to be pending
let pending = PendingCache::new(config.max_pending_items.max(10));
let pending = Vec::with_capacity(config.max_pending_items.max(10));
Self {
recon,
event_q: Vec::with_capacity(config.insert_batch_size.saturating_add(1)),
config,
pending,
pending_q: pending,
}
}

async fn process_value_response(&mut self, key: R::Key, value: Vec<u8>) -> Result<()> {
let new = ReconItem::new(key, value);
if let Some(ok_now) = self.pending.remove_by_needed(&new) {
self.event_q.push(new);
self.event_q.extend(ok_now);
} else if !self.pending.is_tracking(&new) {
self.event_q.push(new);
}
self.event_q.push(new);

if self.event_q.len() >= self.config.insert_batch_size {
self.persist_all().await?;
}
Expand Down Expand Up @@ -782,33 +777,51 @@ where
}
}

/// We attempt to write data in batches to reduce lock contention. However, we need to persist everything in a few cases:
/// - before we calculate a range response, we need to make sure our result (calculated from disk) is up to date
/// - before we sign off on a conversation as either the initiator or responder
/// - when our in memory list gets too large
async fn persist_all(&mut self) -> Result<()> {
if self.event_q.is_empty() {
if self.event_q.is_empty() && self.pending_q.is_empty() {
return Ok(());
}

let evs: Vec<_> = self.event_q.drain(..).collect();
// we should get related items close together, so for now we don't do any fancy tracking of pending items
// and simply track them and retry them with every new batch.
let evs: Vec<_> = self
.event_q
.drain(..)
.chain(self.pending_q.drain(..))
.collect();

let mut batch = self.recon.insert(evs).await.context("persisting all")?;
if !batch.invalid.is_empty() {
if let Ok(cnt) = batch.invalid.len().try_into() {
self.recon.metrics().record(&InvalidEvents(cnt))
}
tracing::warn!(
"Recon discovered data it will never allow. Hanging up on peer {}",
self.config.peer_id
invalid_cnt=%batch.invalid.len(), peer_id=%self.config.peer_id,
"Recon discovered data it will never allow. Hanging up on peer",
);
bail!("Received unknown data from peer: {}", self.config.peer_id);
}

if !batch.pending.is_empty() {
let (added, remaining_capacity) = self.pending.track_pending(&mut batch.pending);
if let Ok(cnt) = added.try_into() {
let new_pending_cnt = batch.pending.len();
if let Ok(cnt) = new_pending_cnt.try_into() {
self.recon.metrics().record(&PendingEvents(cnt))
}
if remaining_capacity < self.config.max_pending_items / 10 {
tracing::debug!(peer_id=%self.config.peer_id,
capacity=%self.config.max_pending_items, %remaining_capacity, "Pending queue has less than 10% capacity remaining");
let capacity = self
.config
.max_pending_items
.saturating_sub(batch.pending.len());

let to_cache = batch.pending.drain(0..capacity.min(new_pending_cnt));
self.pending_q.extend(to_cache);

if !batch.pending.is_empty() {
tracing::info!(dropped=%batch.pending.len(), queue_size=%self.pending_q.len(),
"In memory pending queue is full. Dropping items until a future sync.")
}
}

Expand Down
25 changes: 2 additions & 23 deletions recon/src/recon.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod btreestore;
pub mod pending_cache;
#[cfg(test)]
pub mod tests;

Expand Down Expand Up @@ -443,26 +442,6 @@ pub enum InvalidItem<K: Key> {
},
}

#[derive(Clone, Debug, PartialEq, Eq)]
/// Represents items the store may be able to persist in the future
pub struct PendingItem<K>
where
K: Key,
{
/// The key this item needs in order to be processed.
pub required_key: K,
/// The item that could not be stored. Can be retried in the future if
/// the `required_key` is discovered.
pub item: ReconItem<K>,
}

impl<K: Key> PendingItem<K> {
/// Construct a pending item
pub fn new(required_key: K, item: ReconItem<K>) -> Self {
Self { required_key, item }
}
}

/// The result of an insert operation.
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct InsertResult<K>
Expand All @@ -478,7 +457,7 @@ where
/// Items that may be processed in the future but require more information to be interpreted.
/// For example, the store may need another item to interpret this event (e.g. `PendingItem::RequiresEvent`).
/// In the context of Ceramic, the init event is needed to verify if a data event is valid.
pub pending: Vec<PendingItem<K>>,
pub pending: Vec<ReconItem<K>>,
}

impl<K> InsertResult<K>
Expand All @@ -502,7 +481,7 @@ where
pub fn new_err(
new_cnt: usize,
invalid: Vec<InvalidItem<K>>,
pending: Vec<PendingItem<K>>,
pending: Vec<ReconItem<K>>,
) -> Self {
Self {
new_cnt,
Expand Down
Loading

0 comments on commit cd34094

Please sign in to comment.