Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: modify recon store to support event validation concepts #473

Merged
merged 10 commits into from
Aug 15, 2024
7 changes: 5 additions & 2 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ mod tests {
use futures::TryStreamExt;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use recon::{RangeHash, ReconItem, Result as ReconResult, Sha256a, SyncState};
use recon::{InsertResult, RangeHash, ReconItem, Result as ReconResult, Sha256a, SyncState};
use ssh_key::private::Ed25519Keypair;

use libp2p::{identity::Keypair as Libp2pKeypair, kad::RecordKey};
Expand Down Expand Up @@ -1256,7 +1256,10 @@ mod tests {
type Key = K;
type Hash = Sha256a;

async fn insert(&self, _items: Vec<ReconItem<Self::Key>>) -> ReconResult<()> {
async fn insert(
&self,
_items: Vec<ReconItem<Self::Key>>,
) -> ReconResult<InsertResult<Self::Key>> {
unreachable!()
}

Expand Down
6 changes: 3 additions & 3 deletions recon/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tracing::warn;

use crate::{
recon::{RangeHash, ReconItem, SyncState},
AssociativeHash, Error, InterestProvider, Key, Metrics, Recon, Result, Store,
AssociativeHash, Error, InsertResult, InterestProvider, Key, Metrics, Recon, Result, Store,
};

/// Client to a [`Recon`] [`Server`].
Expand All @@ -26,7 +26,7 @@ where
H: AssociativeHash,
{
/// Sends an insert request to the server and awaits the response.
pub async fn insert(&self, items: Vec<ReconItem<K>>) -> Result<()> {
pub async fn insert(&self, items: Vec<ReconItem<K>>) -> Result<InsertResult<K>> {
let (ret, rx) = oneshot::channel();
self.sender.send(Request::Insert { items, ret }).await?;
rx.await?
Expand Down Expand Up @@ -145,7 +145,7 @@ where
{
Insert {
items: Vec<ReconItem<K>>,
ret: oneshot::Sender<Result<()>>,
ret: oneshot::Sender<Result<InsertResult<K>>>,
},
Len {
ret: oneshot::Sender<Result<usize>>,
Expand Down
2 changes: 1 addition & 1 deletion recon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use crate::{
metrics::Metrics,
recon::{
btreestore::BTreeStore, AssociativeHash, EventIdStore, FullInterests, HashCount,
InsertResult, InterestProvider, InterestStore, Key, RangeHash, Recon,
InsertResult, InterestProvider, InterestStore, InvalidItem, Key, RangeHash, Recon,
ReconInterestProvider, ReconItem, Split, Store, SyncState,
},
sha256a::Sha256a,
Expand Down
18 changes: 8 additions & 10 deletions recon/src/libp2p/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,16 @@ use libp2p::swarm::ConnectionId;
use libp2p_identity::PeerId;
use tracing::Level;

use crate::protocol::ProtocolConfig;
use crate::{
libp2p::stream_set::StreamSet,
protocol::{self, Recon},
};

// The max number of writes we'll batch up before flushing anything to disk.
// As we descend the tree and find smaller ranges, this won't apply as we have to flush
// before recomputing a range, but it will be used when we're processing large ranges we don't yet have.
const INSERT_BATCH_SIZE: usize = 100;

// Initiate Recon synchronization with a peer over a stream.
// Intiate Recon synchronization with a peer over a stream.
#[tracing::instrument(skip(recon, stream, ), ret(level = Level::DEBUG))]
pub async fn initiate_synchronize<S, R>(
remote_peer_id: PeerId, // included for context only
remote_peer_id: PeerId,
connection_id: ConnectionId, // included for context only
stream_set: StreamSet,
recon: R,
Expand All @@ -30,13 +26,14 @@ where
{
let codec = CborCodec::new();
let stream = Framed::new(stream, codec);
protocol::initiate_synchronize(recon, stream, INSERT_BATCH_SIZE).await?;
protocol::initiate_synchronize(recon, stream, ProtocolConfig::new_peer_id(remote_peer_id))
.await?;
Ok(stream_set)
}
// Intiate Recon synchronization with a peer over a stream.
#[tracing::instrument(skip(recon, stream, ), ret(level = Level::DEBUG))]
pub async fn respond_synchronize<S, R>(
remote_peer_id: PeerId, // included for context only
remote_peer_id: PeerId,
connection_id: ConnectionId, // included for context only
stream_set: StreamSet,
recon: R,
Expand All @@ -48,6 +45,7 @@ where
{
let codec = CborCodec::new();
let stream = Framed::new(stream, codec);
protocol::respond_synchronize(recon, stream, INSERT_BATCH_SIZE).await?;
protocol::respond_synchronize(recon, stream, ProtocolConfig::new_peer_id(remote_peer_id))
.await?;
Ok(stream_set)
}
2 changes: 1 addition & 1 deletion recon/src/libp2p/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where
type Key = K;
type Hash = H;

async fn insert_many(&self, items: &[ReconItem<K>]) -> ReconResult<InsertResult> {
async fn insert_many(&self, items: &[ReconItem<K>]) -> ReconResult<InsertResult<Self::Key>> {
self.as_error()?;

self.inner.insert_many(items).await
Expand Down
53 changes: 52 additions & 1 deletion recon/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use prometheus_client::{

use crate::{
protocol::{InitiatorMessage, ResponderMessage},
AssociativeHash, Key,
AssociativeHash, InvalidItem, Key,
};

/// Metrics for Recon P2P events
Expand All @@ -24,6 +24,9 @@ pub struct Metrics {

protocol_write_loop_count: Counter,
protocol_run_duration: Histogram,

protocol_pending_items: Counter,
protocol_invalid_items: Family<InvalidItemLabels, Counter>,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
Expand Down Expand Up @@ -97,11 +100,27 @@ impl Metrics {
sub_registry
);

register!(
protocol_pending_items,
"Number of items received that depend on undiscovered events",
Counter::default(),
sub_registry
);

register!(
protocol_invalid_items,
"Number of items received that were considered invalid",
Family::<InvalidItemLabels, Counter>::default(),
sub_registry
);

Self {
protocol_message_received_count,
protocol_message_sent_count,
protocol_write_loop_count,
protocol_run_duration,
protocol_pending_items,
protocol_invalid_items,
}
}
}
Expand Down Expand Up @@ -145,3 +164,35 @@ impl Recorder<ProtocolRun> for Metrics {
self.protocol_run_duration.observe(event.0.as_secs_f64());
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub(crate) struct InvalidItemLabels {
pub(crate) reason: &'static str,
}

impl<K: Key> Recorder<InvalidItem<K>> for Metrics {
fn record(&self, event: &InvalidItem<K>) {
let labels = event.into();
self.protocol_invalid_items.get_or_create(&labels).inc();
}
}

impl<K: Key> From<&InvalidItem<K>> for InvalidItemLabels {
fn from(value: &InvalidItem<K>) -> Self {
match value {
InvalidItem::InvalidFormat { .. } => InvalidItemLabels {
reason: "InvalidFormat",
},
InvalidItem::InvalidSignature { .. } => InvalidItemLabels {
reason: "InvalidSignature",
},
}
}
}

pub(crate) struct PendingEvents(pub u64);
impl Recorder<PendingEvents> for Metrics {
fn record(&self, event: &PendingEvents) {
self.protocol_pending_items.inc_by(event.0);
}
}
Loading
Loading