Skip to content

Commit

Permalink
feat: track and resubmit events from recon when history discovered
Browse files Browse the repository at this point in the history
we also explicitly hangup if the store says something is invalid
  • Loading branch information
dav1do committed Aug 9, 2024
1 parent cb4fec5 commit dff5ab9
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 12 deletions.
2 changes: 1 addition & 1 deletion recon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub use crate::{
error::Error,
metrics::Metrics,
recon::{
btreestore::BTreeStore, AssociativeHash, EventIdStore, FullInterests, HashCount,
btreestore::BTreeStore, pending_cache::PendingCache, AssociativeHash, EventIdStore,
FullInterests, HashCount, InsertBatch, InterestProvider, InterestStore, InvalidItem, Key,
PendingItem, RangeHash, Recon, ReconInterestProvider, ReconItem, Split, Store, SyncState,
},
Expand Down
33 changes: 33 additions & 0 deletions recon/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub struct Metrics {

protocol_write_loop_count: Counter,
protocol_run_duration: Histogram,

protocol_pending_items: Counter,
protocol_invalid_items: Counter,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
Expand Down Expand Up @@ -97,11 +100,27 @@ impl Metrics {
sub_registry
);

register!(
protocol_pending_items,
"Number of items received that depend on undiscovered events",
Counter::default(),
sub_registry
);

register!(
protocol_invalid_items,
"Number of items received that were considered invalid",
Counter::default(),
sub_registry
);

Self {
protocol_message_received_count,
protocol_message_sent_count,
protocol_write_loop_count,
protocol_run_duration,
protocol_pending_items,
protocol_invalid_items,
}
}
}
Expand Down Expand Up @@ -145,3 +164,17 @@ impl Recorder<ProtocolRun> for Metrics {
self.protocol_run_duration.observe(event.0.as_secs_f64());
}
}

pub(crate) struct InvalidEvents(pub u64);
impl Recorder<InvalidEvents> for Metrics {
fn record(&self, event: &InvalidEvents) {
self.protocol_invalid_items.inc_by(event.0);
}
}

pub(crate) struct PendingEvents(pub u64);
impl Recorder<PendingEvents> for Metrics {
fn record(&self, event: &PendingEvents) {
self.protocol_pending_items.inc_by(event.0);
}
}
59 changes: 52 additions & 7 deletions recon/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
//! Encoding and framing of messages is outside the scope of this crate.
//! However the message types do implement serde::Serialize and serde::Deserialize.

use anyhow::{anyhow, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use async_stream::try_stream;
use async_trait::async_trait;
use ceramic_core::RangeOpen;
Expand All @@ -25,8 +25,11 @@ use tracing::{instrument, trace, Level};
use uuid::Uuid;

use crate::{
metrics::{MessageLabels, MessageRecv, MessageSent, Metrics, ProtocolRun, ProtocolWriteLoop},
recon::{RangeHash, SyncState},
metrics::{
InvalidEvents, MessageLabels, MessageRecv, MessageSent, Metrics, PendingEvents,
ProtocolRun, ProtocolWriteLoop,
},
recon::{pending_cache::PendingCache, RangeHash, SyncState},
AssociativeHash, Client, InsertBatch, Key, ReconItem, Result as ReconResult,
};

Expand Down Expand Up @@ -626,7 +629,12 @@ where
// There are optimizations we can make here (to the protocol or using in memory data), but for now we keep it simple
// and should still get some benefit as we were previously writing every value individually.
self.common.persist_all().await?;
let sync_state = self.common.recon.process_range(range).await?;
let sync_state = self
.common
.recon
.process_range(range)
.await
.context("responder process_range")?;
match sync_state {
SyncState::Synchronized { range } => {
// We are sync echo back the same range so that the remote learns we are in sync.
Expand Down Expand Up @@ -721,26 +729,38 @@ struct Common<R: Recon> {
recon: R,
event_q: Vec<ReconItem<R::Key>>,
config: ProtocolConfig,
pending: PendingCache<R::Key>,
}

impl<R> Common<R>
where
R: Recon,
{
fn new(recon: R, config: ProtocolConfig) -> Self {
// allow at least 10 events to be pending
let pending = PendingCache::new(config.max_pending_items.max(10));
Self {
recon,
event_q: Vec::with_capacity(config.insert_batch_size.saturating_add(1)),
config,
pending,
}
}

async fn process_value_response(&mut self, key: R::Key, value: Vec<u8>) -> Result<()> {
self.event_q.push(ReconItem::new(key, value));
let new = ReconItem::new(key, value);
if let Some(ok_now) = self.pending.remove_by_needed(&new) {
self.event_q.push(new);
self.event_q.push(ok_now);
} else if !self.pending.is_tracking(&new) {
self.event_q.push(new);
}
if self.event_q.len() >= self.config.insert_batch_size {
self.persist_all().await?;
}
Ok(())
}

// The remote is missing all keys in the range send them over.
fn process_remote_missing_ranges(
&mut self,
Expand All @@ -763,10 +783,35 @@ where
}

async fn persist_all(&mut self) -> Result<()> {
if self.event_q.is_empty() {
return Ok(());
}

let evs: Vec<_> = self.event_q.drain(..).collect();
if !evs.is_empty() {
self.recon.insert(evs).await?;

let mut batch = self.recon.insert(evs).await.context("persisting all")?;
if !batch.invalid.is_empty() {
if let Ok(cnt) = batch.invalid.len().try_into() {
self.recon.metrics().record(&InvalidEvents(cnt))
}
tracing::warn!(
"Recon discovered data it will never allow. Hanging up on peer {}",
self.config.peer_id
);
bail!("Received unknown data from peer: {}", self.config.peer_id);
}

if !batch.pending.is_empty() {
let (added, remaining_capacity) = self.pending.track_pending(&mut batch.pending);
if let Ok(cnt) = added.try_into() {
self.recon.metrics().record(&PendingEvents(cnt))
}
if remaining_capacity < self.config.max_pending_items / 10 {
tracing::debug!(peer_id=%self.config.peer_id,
capacity=%self.config.max_pending_items, %remaining_capacity, "Pending queue has less than 10% capacity remaining");
}
}

Ok(())
}
}
Expand Down
32 changes: 28 additions & 4 deletions recon/src/recon.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod btreestore;
pub mod pending_cache;
#[cfg(test)]
pub mod tests;

Expand Down Expand Up @@ -280,11 +281,10 @@ where
self.store.value_for_key(&key).await
}

/// Insert key into the key space.
/// Returns Ok if the result was accepted. It may be validated and stored
/// out of band, meaning it may not immediately return in range queries.
/// Insert keys into the key space.
pub async fn insert(&self, items: Vec<ReconItem<K>>) -> Result<InsertBatch<K>> {
self.store.insert_many(&items).await
let res = self.store.insert_many(&items).await?;
Ok(res)
}

/// Reports total number of keys
Expand Down Expand Up @@ -456,6 +456,13 @@ where
pub item: ReconItem<K>,
}

impl<K: Key> PendingItem<K> {
/// Construct a pending item
pub fn new(required_key: K, item: ReconItem<K>) -> Self {
Self { required_key, item }
}
}

/// The result of an insert operation.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
pub struct InsertBatch<K>
Expand Down Expand Up @@ -487,6 +494,23 @@ where
}
}

/// yah
pub fn item_count(&self) -> usize {
self.new_cnt + self.invalid.len() + self.pending.len()
}
/// yayayay
pub fn new_err(
new_cnt: usize,
invalid: Vec<InvalidItem<K>>,
pending: Vec<PendingItem<K>>,
) -> Self {
Self {
new_cnt,
invalid,
pending,
}
}

/// true if any key is new, false otherwise
pub fn included_new_key(&self) -> bool {
self.new_cnt > 0
Expand Down
153 changes: 153 additions & 0 deletions recon/src/recon/pending_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use std::collections::HashMap;

use crate::{Key, PendingItem, ReconItem};

#[derive(Debug, Default)]
/// This struct manages tracking items that we attempted to deliver but could not be processed
/// because we need to discover additional information.
pub struct PendingCache<K: Key> {
by_needed_key: HashMap<Vec<u8>, ReconItem<K>>,
item_keys: std::collections::HashSet<Vec<u8>>,
max_size: usize,
}

impl<K: Key> PendingCache<K> {
/// Create a new PendingCache with the given capacity
pub fn new(max_size: usize) -> Self {
Self {
by_needed_key: Default::default(),
item_keys: Default::default(),
max_size,
}
}

fn capacity(&self) -> usize {
self.max_size.saturating_sub(self.size())
}

fn size(&self) -> usize {
debug_assert_eq!(self.by_needed_key.len(), self.item_keys.len());
self.by_needed_key.len()
}

/// Stop tracking an item and return it if the key it the item it required shows up
pub fn remove_by_needed(&mut self, new_item: &ReconItem<K>) -> Option<ReconItem<K>> {
if let Some(ok_now) = self.by_needed_key.remove(new_item.key.as_bytes()) {
self.item_keys.remove(ok_now.key.as_bytes());
Some(ok_now)
} else {
None
}
}

/// Returns true if we're already tracking the given item
pub fn is_tracking(&self, item: &ReconItem<K>) -> bool {
self.item_keys.contains(item.key.as_bytes())
}

/// Update our cache to include the new items up to the allowed capacity.
/// Returns a tuple of (items tracked, capacity remaining)
pub fn track_pending(&mut self, items: &mut Vec<PendingItem<K>>) -> (usize, usize) {
let mut tracked = 0;
let mut allowed = self.capacity();
if allowed > 0 {
while let Some(val) = items.pop() {
if !self.item_keys.contains(val.item.key.as_bytes()) {
self.item_keys.insert(val.item.key.as_bytes().to_vec());
self.by_needed_key
.insert(val.required_key.as_bytes().to_vec(), val.item);
allowed = allowed.saturating_sub(1);
tracked += 1;
}
if allowed == 0 || items.is_empty() {
break;
}
}
}
(tracked, allowed)
}
}

#[cfg(test)]
mod test {

use crate::tests::AlphaNumBytes;
use test_log::test;

use super::*;

const REQUIRED_OFFSET: usize = 1_000_000;
fn get_items(num: usize) -> Vec<PendingItem<AlphaNumBytes>> {
let mut res = Vec::with_capacity(num);
for i in 0..num {
let id = AlphaNumBytes::from(format!("{}", i));
let req = required_item(i);
let item = ReconItem::new(id, i.to_le_bytes().to_vec());
res.push(PendingItem::new(req.key, item));
}

res
}

fn required_item(i: usize) -> ReconItem<AlphaNumBytes> {
let req = AlphaNumBytes::from(format!("{}", i + REQUIRED_OFFSET));
let val = req.as_bytes().to_vec();
ReconItem::new(req, val)
}

fn cache_and_assert(
cache: &mut PendingCache<AlphaNumBytes>,
mut items: Vec<PendingItem<AlphaNumBytes>>,
expected_cached: usize,
) {
let expected_items = items.clone();
let expected_cache_size = cache.size() + expected_cached;
let expected_remaining = items.len() - expected_cached;
cache.track_pending(&mut items);
assert_eq!(expected_cache_size, cache.item_keys.len());
assert_eq!(expected_cache_size, cache.by_needed_key.len());
assert_eq!(expected_remaining, items.len(), "{:?}", items);

// we cache the vec as a stack so we need to use the original index from the list
let skip_offset = expected_items.len() - expected_cached;
for (i, v) in expected_items.into_iter().skip(skip_offset).enumerate() {
assert!(
cache.is_tracking(&v.item),
"not tracking: {:?} {:?}",
v,
cache
);
let req = required_item(i + skip_offset);
let cached = cache.remove_by_needed(&req).unwrap_or_else(|| {
panic!("should have cached {:?} by {:?} cache={:?}", v, req, cache)
});
assert_eq!(v.item, cached);
}

assert_eq!(0, cache.item_keys.len());
assert_eq!(0, cache.by_needed_key.len());
}

#[test]
fn pending_caches_max() {
let mut cache = PendingCache::new(10);
let items = get_items(10);
cache_and_assert(&mut cache, items, 10);
}

#[test]
fn pending_caches_with_space() {
let mut cache = PendingCache::new(20);
let items = get_items(10);
cache_and_assert(&mut cache, items, 10);
let items = get_items(5);
cache_and_assert(&mut cache, items, 5);
}

#[test]
fn pending_caches_drops_too_many() {
let mut cache = PendingCache::new(10);
let items = get_items(20);
cache_and_assert(&mut cache, items, 10);
}
}

0 comments on commit dff5ab9

Please sign in to comment.