Skip to content

Commit

Permalink
chore: review comments
Browse files Browse the repository at this point in the history
- rename InsertBatch -> InsertResult
- use for loop in pending_cache instead of while loop
- consistent Self::Key usage in trait
  • Loading branch information
dav1do committed Aug 13, 2024
1 parent 623ec58 commit 2dd6e6b
Show file tree
Hide file tree
Showing 13 changed files with 50 additions and 52 deletions.
7 changes: 5 additions & 2 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ mod tests {
use futures::TryStreamExt;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use recon::{InsertBatch, RangeHash, ReconItem, Result as ReconResult, Sha256a, SyncState};
use recon::{InsertResult, RangeHash, ReconItem, Result as ReconResult, Sha256a, SyncState};
use ssh_key::private::Ed25519Keypair;

use libp2p::{identity::Keypair as Libp2pKeypair, kad::RecordKey};
Expand Down Expand Up @@ -1256,7 +1256,10 @@ mod tests {
type Key = K;
type Hash = Sha256a;

async fn insert(&self, _items: Vec<ReconItem<Self::Key>>) -> ReconResult<InsertBatch<K>> {
async fn insert(
&self,
_items: Vec<ReconItem<Self::Key>>,
) -> ReconResult<InsertResult<Self::Key>> {
unreachable!()
}

Expand Down
6 changes: 3 additions & 3 deletions recon/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tracing::warn;

use crate::{
recon::{RangeHash, ReconItem, SyncState},
AssociativeHash, Error, InsertBatch, InterestProvider, Key, Metrics, Recon, Result, Store,
AssociativeHash, Error, InsertResult, InterestProvider, Key, Metrics, Recon, Result, Store,
};

/// Client to a [`Recon`] [`Server`].
Expand All @@ -26,7 +26,7 @@ where
H: AssociativeHash,
{
/// Sends an insert request to the server and awaits the response.
pub async fn insert(&self, items: Vec<ReconItem<K>>) -> Result<InsertBatch<K>> {
pub async fn insert(&self, items: Vec<ReconItem<K>>) -> Result<InsertResult<K>> {
let (ret, rx) = oneshot::channel();
self.sender.send(Request::Insert { items, ret }).await?;
rx.await?
Expand Down Expand Up @@ -145,7 +145,7 @@ where
{
Insert {
items: Vec<ReconItem<K>>,
ret: oneshot::Sender<Result<InsertBatch<K>>>,
ret: oneshot::Sender<Result<InsertResult<K>>>,
},
Len {
ret: oneshot::Sender<Result<usize>>,
Expand Down
2 changes: 1 addition & 1 deletion recon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use crate::{
metrics::Metrics,
recon::{
btreestore::BTreeStore, pending_cache::PendingCache, AssociativeHash, EventIdStore,
FullInterests, HashCount, InsertBatch, InterestProvider, InterestStore, InvalidItem, Key,
FullInterests, HashCount, InsertResult, InterestProvider, InterestStore, InvalidItem, Key,
PendingItem, RangeHash, Recon, ReconInterestProvider, ReconItem, Split, Store, SyncState,
},
sha256a::Sha256a,
Expand Down
4 changes: 2 additions & 2 deletions recon/src/libp2p/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::ops::Range;

use crate::{
libp2p::{stream_set::StreamSet, PeerEvent, PeerStatus},
AssociativeHash, BTreeStore, Error, FullInterests, HashCount, InsertBatch, InterestProvider,
AssociativeHash, BTreeStore, Error, FullInterests, HashCount, InsertResult, InterestProvider,
Key, Metrics, Recon, ReconItem, Result as ReconResult, Server, Store,
};

Expand Down Expand Up @@ -75,7 +75,7 @@ where
type Key = K;
type Hash = H;

async fn insert_many(&self, items: &[ReconItem<K>]) -> ReconResult<InsertBatch<Self::Key>> {
async fn insert_many(&self, items: &[ReconItem<K>]) -> ReconResult<InsertResult<Self::Key>> {
self.as_error()?;

self.inner.insert_many(items).await
Expand Down
14 changes: 8 additions & 6 deletions recon/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
ProtocolRun, ProtocolWriteLoop,
},
recon::{pending_cache::PendingCache, RangeHash, SyncState},
AssociativeHash, Client, InsertBatch, Key, ReconItem, Result as ReconResult,
AssociativeHash, Client, InsertResult, Key, ReconItem, Result as ReconResult,
};

// Limit to the number of pending range requests.
Expand Down Expand Up @@ -66,8 +66,8 @@ pub struct ProtocolConfig {
/// As we descend the tree and find smaller ranges, this won't apply as we have to flush
/// before recomputing a range, but it will be used when we're processing large ranges we don't yet have.
pub insert_batch_size: usize,
/// The maximum number of items that can not be stored in memory before we begin to drop them.
/// This is due to the event being dependent on another event we need to discover before it can be interpreted.
/// The maximum number of items that can be stored in memory before we begin to drop them.
/// This happens when an event is dependent on another event we need to discover before it can be interpreted.
pub max_pending_items: usize,
#[allow(dead_code)]
/// The ID of the peer we're syncing with.
Expand Down Expand Up @@ -832,8 +832,10 @@ pub trait Recon: Clone + Send + Sync + 'static {
type Hash: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>;

/// Insert new keys into the key space.
async fn insert(&self, items: Vec<ReconItem<Self::Key>>)
-> ReconResult<InsertBatch<Self::Key>>;
async fn insert(
&self,
items: Vec<ReconItem<Self::Key>>,
) -> ReconResult<InsertResult<Self::Key>>;

/// Get all keys in the specified range
async fn range(
Expand Down Expand Up @@ -889,7 +891,7 @@ where
type Key = K;
type Hash = H;

async fn insert(&self, items: Vec<ReconItem<K>>) -> ReconResult<InsertBatch<K>> {
async fn insert(&self, items: Vec<ReconItem<K>>) -> ReconResult<InsertResult<K>> {
Client::insert(self, items).await
}

Expand Down
14 changes: 7 additions & 7 deletions recon/src/recon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ where
}

/// Insert keys into the key space.
pub async fn insert(&self, items: Vec<ReconItem<K>>) -> Result<InsertBatch<K>> {
pub async fn insert(&self, items: Vec<ReconItem<K>>) -> Result<InsertResult<K>> {
let res = self.store.insert_many(&items).await?;
Ok(res)
}
Expand Down Expand Up @@ -465,7 +465,7 @@ impl<K: Key> PendingItem<K> {

/// The result of an insert operation.
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct InsertBatch<K>
pub struct InsertResult<K>
where
K: Key,
{
Expand All @@ -481,7 +481,7 @@ where
pub pending: Vec<PendingItem<K>>,
}

impl<K> InsertBatch<K>
impl<K> InsertResult<K>
where
K: Key,
{
Expand All @@ -494,11 +494,11 @@ where
}
}

/// yah
/// Get the total count of items included whether added, pending or invalid
pub fn item_count(&self) -> usize {
self.new_cnt + self.invalid.len() + self.pending.len()
}
/// yayayay
/// Create with invalid or pending items
pub fn new_err(
new_cnt: usize,
invalid: Vec<InvalidItem<K>>,
Expand Down Expand Up @@ -533,7 +533,7 @@ pub trait Store {
/// Insert new keys into the key space.
/// Returns true for each key if it did not previously exist, in the
/// same order as the input iterator.
async fn insert_many(&self, items: &[ReconItem<Self::Key>]) -> Result<InsertBatch<Self::Key>>;
async fn insert_many(&self, items: &[ReconItem<Self::Key>]) -> Result<InsertResult<Self::Key>>;

/// Return the hash of all keys in the range between left_fencepost and right_fencepost.
/// The upper range bound is exclusive.
Expand Down Expand Up @@ -641,7 +641,7 @@ where
type Key = K;
type Hash = H;

async fn insert_many(&self, items: &[ReconItem<Self::Key>]) -> Result<InsertBatch<Self::Key>> {
async fn insert_many(&self, items: &[ReconItem<Self::Key>]) -> Result<InsertResult<Self::Key>> {
self.as_ref().insert_many(items).await
}

Expand Down
6 changes: 3 additions & 3 deletions recon/src/recon/btreestore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tracing::instrument;

use crate::{
recon::{AssociativeHash, Key, MaybeHashedKey, ReconItem, Store},
HashCount, InsertBatch, Result,
HashCount, InsertResult, Result,
};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -145,13 +145,13 @@ where
type Hash = H;

#[instrument(skip(self))]
async fn insert_many(&self, items: &[ReconItem<Self::Key>]) -> Result<InsertBatch<Self::Key>> {
async fn insert_many(&self, items: &[ReconItem<Self::Key>]) -> Result<InsertResult<Self::Key>> {
tracing::trace!("inserting items: {}", items.len());
let mut new = 0;
for item in items.iter() {
self.insert(item).await?.then(|| new += 1);
}
Ok(InsertBatch::new(new))
Ok(InsertResult::new(new))
}

async fn hash_range(&self, range: Range<&Self::Key>) -> Result<HashCount<Self::Hash>> {
Expand Down
27 changes: 10 additions & 17 deletions recon/src/recon/pending_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,16 @@ impl<K: Key> PendingCache<K> {
/// Returns a tuple of (items tracked, capacity remaining)
pub fn track_pending(&mut self, items: &mut Vec<PendingItem<K>>) -> (usize, usize) {
let mut tracked = 0;
let mut allowed = self.capacity();
if allowed > 0 {
while let Some(val) = items.pop() {
if !self.item_keys.contains(val.item.key.as_bytes()) {
self.item_keys.insert(val.item.key.as_bytes().to_vec());
self.by_needed_key
.insert(val.required_key.as_bytes().to_vec(), val.item);
allowed = allowed.saturating_sub(1);
tracked += 1;
}
if allowed == 0 || items.is_empty() {
break;
}
for val in items.drain(0..self.capacity().min(items.len())) {
if !self.item_keys.contains(val.item.key.as_bytes()) {
self.item_keys.insert(val.item.key.as_bytes().to_vec());
self.by_needed_key
.insert(val.required_key.as_bytes().to_vec(), val.item);
tracked += 1;
}
}
(tracked, allowed)

(tracked, self.capacity())
}
}

Expand Down Expand Up @@ -109,15 +103,14 @@ mod test {
assert_eq!(expected_remaining, items.len(), "{:?}", items);

// we cache the vec as a stack so we need to use the original index from the list
let skip_offset = expected_items.len() - expected_cached;
for (i, v) in expected_items.into_iter().skip(skip_offset).enumerate() {
for (i, v) in expected_items.into_iter().take(expected_cached).enumerate() {
assert!(
cache.is_tracking(&v.item),
"not tracking: {:?} {:?}",
v,
cache
);
let req = required_item(i + skip_offset);
let req = required_item(i);
let cached = cache.remove_by_needed(&req).unwrap_or_else(|| {
panic!("should have cached {:?} by {:?} cache={:?}", v, req, cache)
});
Expand Down
4 changes: 2 additions & 2 deletions service/src/event/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ impl recon::Store for CeramicEventService {
async fn insert_many(
&self,
items: &[ReconItem<Self::Key>],
) -> ReconResult<recon::InsertBatch<EventId>> {
) -> ReconResult<recon::InsertResult<EventId>> {
let res = self
.insert_events(items, DeliverableRequirement::Asap)
.await?;

Ok(recon::InsertBatch::new(res.store_result.count_new_keys()))
Ok(recon::InsertResult::new(res.store_result.count_new_keys()))
}

/// Return the hash of all keys in the range between left_fencepost and right_fencepost.
Expand Down
4 changes: 2 additions & 2 deletions service/src/interest/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::ops::Range;

use ceramic_core::Interest;
use ceramic_store::CeramicOneInterest;
use recon::{HashCount, InsertBatch, ReconItem, Result as ReconResult, Sha256a};
use recon::{HashCount, InsertResult, ReconItem, Result as ReconResult, Sha256a};
use tracing::instrument;

use crate::CeramicInterestService;
Expand All @@ -19,7 +19,7 @@ impl recon::Store for CeramicInterestService {
async fn insert_many(
&self,
items: &[ReconItem<Self::Key>],
) -> ReconResult<InsertBatch<Interest>> {
) -> ReconResult<InsertResult<Interest>> {
let keys = items.iter().map(|item| &item.key).collect::<Vec<_>>();
Ok(CeramicOneInterest::insert_many(&self.pool, &keys).await?)
}
Expand Down
4 changes: 2 additions & 2 deletions service/src/tests/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ceramic_api::{ApiItem, EventStore};
use cid::{Cid, CidGeneric};
use expect_test::expect;
use iroh_bitswap::Store;
use recon::{InsertBatch, ReconItem, Sha256a};
use recon::{InsertResult, ReconItem, Sha256a};

use super::*;

Expand Down Expand Up @@ -134,7 +134,7 @@ where
let actual = recon::Store::insert_many(&store, &[ReconItem::new(id.clone(), car1)])
.await
.unwrap();
assert_eq!(actual, InsertBatch::new(1));
assert_eq!(actual, InsertResult::new(1));

match recon::Store::insert_many(&store, &[ReconItem::new(id.clone(), car2)]).await {
Ok(_) => panic!("expected error"),
Expand Down
2 changes: 1 addition & 1 deletion store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ where
async fn insert_many(
&self,
items: &[ReconItem<Self::Key>],
) -> ReconResult<recon::InsertBatch<Self::Key>> {
) -> ReconResult<recon::InsertResult<Self::Key>> {
let res = StoreMetricsMiddleware::<S>::record(
&self.metrics,
"insert_many",
Expand Down
8 changes: 4 additions & 4 deletions store/src/sql/access/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::ops::Range;
use anyhow::anyhow;

use ceramic_core::Interest;
use recon::{AssociativeHash, HashCount, InsertBatch, Key, Sha256a};
use recon::{AssociativeHash, HashCount, InsertResult, Key, Sha256a};
use sqlx::Row;

use crate::{
Expand Down Expand Up @@ -67,9 +67,9 @@ impl CeramicOneInterest {
pub async fn insert_many(
pool: &SqlitePool,
items: &[&Interest],
) -> Result<InsertBatch<Interest>> {
) -> Result<InsertResult<Interest>> {
match items.len() {
0 => Ok(InsertBatch::new(0)),
0 => Ok(InsertResult::new(0)),
_ => {
let mut results = 0;
let mut tx = pool.begin_tx().await.map_err(Error::from)?;
Expand All @@ -80,7 +80,7 @@ impl CeramicOneInterest {
.then(|| results += 1);
}
tx.commit().await.map_err(Error::from)?;
Ok(InsertBatch::new(results))
Ok(InsertResult::new(results))
}
}
}
Expand Down

0 comments on commit 2dd6e6b

Please sign in to comment.