diff --git a/Cargo.lock b/Cargo.lock index 981e066ab..b460c762d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1261,6 +1261,7 @@ dependencies = [ "dirs", "expect-test", "futures", + "futures-timer", "futures-util", "git-version", "glob", @@ -1274,6 +1275,8 @@ dependencies = [ "libp2p-mplex", "libp2p-quic", "lru 0.10.1", + "multihash 0.18.1", + "prometheus-client", "rand 0.8.5", "rand_chacha 0.3.1", "recon", diff --git a/one/src/lib.rs b/one/src/lib.rs index c01dc479f..63b8a36b1 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -424,8 +424,9 @@ impl Daemon { }; let ipfs_metrics = ceramic_metrics::MetricsHandle::register(ceramic_kubo_rpc::IpfsMetrics::register); + let p2p_metrics = ceramic_metrics::MetricsHandle::register(ceramic_p2p::Metrics::register); let ipfs = Ipfs::builder() - .with_p2p(p2p_config, keypair, recons, sql_pool.clone()) + .with_p2p(p2p_config, keypair, recons, sql_pool.clone(), p2p_metrics) .await? .build(sql_pool.clone(), ipfs_metrics) .await?; diff --git a/one/src/network.rs b/one/src/network.rs index f5db501fe..5a4f0c575 100644 --- a/one/src/network.rs +++ b/one/src/network.rs @@ -40,6 +40,7 @@ impl Builder { keypair: Keypair, recons: Option<(I, M)>, sql_pool: SqlitePool, + metrics: ceramic_p2p::Metrics, ) -> anyhow::Result> where I: Recon, @@ -51,7 +52,7 @@ impl Builder { config.libp2p = libp2p_config; - let mut p2p = Node::new(config, addr.clone(), keypair, recons, sql_pool).await?; + let mut p2p = Node::new(config, addr.clone(), keypair, recons, sql_pool, metrics).await?; let task = task::spawn(async move { if let Err(err) = p2p.run().await { diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 3179c520c..55f95751e 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -14,15 +14,22 @@ anyhow.workspace = true async-stream.workspace = true async-trait.workspace = true bytes.workspace = true +ceramic-core.workspace = true +ceramic-metrics.workspace = true +chrono = "0.4.31" cid.workspace = true -clap.workspace = true clap-verbosity-flag = "2.0.1" +clap.workspace = true clio = { version = "0.3.4", features = ["clap-parse"] } +dirs = "5.0.1" +expect-test.workspace = true +futures-timer = "3.0.2" futures-util.workspace = true futures.workspace = true git-version.workspace = true +glob = "0.3.1" +hex.workspace = true iroh-bitswap.workspace = true -ceramic-metrics.workspace = true iroh-rpc-client.workspace = true iroh-rpc-types.workspace = true iroh-util.workspace = true @@ -30,6 +37,8 @@ libp2p-identity.workspace = true libp2p-mplex = "0.40.0" libp2p-quic = { version = "0.9", features = ["tokio"] } lru.workspace = true +multihash.workspace = true +prometheus-client.workspace = true rand.workspace = true recon.workspace = true serde = { workspace = true, features = ["derive"] } @@ -42,13 +51,6 @@ tracing-subscriber = { workspace = true, features = ["env-filter"] } tracing.workspace = true void.workspace = true zeroize.workspace = true -ceramic-core.workspace = true -glob = "0.3.1" -hex.workspace = true -expect-test.workspace = true -dirs = "5.0.1" -chrono = "0.4.31" - [dependencies.libp2p] workspace = true diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index eda8bcb43..d9157bcac 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -25,7 +25,6 @@ use libp2p::{ }; use libp2p_identity::Keypair; use recon::{libp2p::Recon, Sha256a}; -use sqlx::SqlitePool; use tracing::{info, warn}; pub use self::event::Event; @@ -101,7 +100,7 @@ where config: &Libp2pConfig, relay_client: Option, recons: Option<(I, M)>, - sql_pool: SqlitePool, + block_store: SQLiteBlockStore, ) -> Result { let peer_manager = PeerManager::default(); let pub_key = local_key.public(); @@ -115,8 +114,7 @@ where } else { BitswapConfig::default_client_mode() }; - let store: SQLiteBlockStore = SQLiteBlockStore::new(sql_pool).await?; - Some(Bitswap::::new(peer_id, store, bs_config).await) + Some(Bitswap::::new(peer_id, block_store, bs_config).await) } else { None } @@ -132,22 +130,27 @@ where let kad = if config.kademlia { info!("init kademlia"); - // TODO: persist to store let mem_store_config = MemoryStoreConfig { - // enough for >10gb of unixfs files at the default chunk size - max_records: 1024 * 64, - max_provided_keys: 1024 * 64, + // We do not store records. + max_records: 0, + max_provided_keys: 10_000_000, + max_providers_per_key: config.kademlia_replication_factor.into(), ..Default::default() }; let store = MemoryStore::with_config(peer_id, mem_store_config); let mut kad_config = kad::Config::default(); - kad_config.set_replication_factor(config.kademlia_replication_factor); - kad_config.set_parallelism(config.kademlia_parallelism); - kad_config.set_query_timeout(config.kademlia_query_timeout); - kad_config.set_provider_record_ttl(config.kademlia_provider_record_ttl); kad_config - .set_provider_publication_interval(config.kademlia_provider_publication_interval); + .set_replication_factor(config.kademlia_replication_factor) + .set_parallelism(config.kademlia_parallelism) + .set_query_timeout(config.kademlia_query_timeout) + .set_provider_record_ttl(config.kademlia_provider_record_ttl) + // Disable record (re)-replication and (re)-publication + .set_replication_interval(None) + .set_publication_interval(None) + // Disable provider record (re)-publication + // Provider records are re-published via the [`crate::publisher::Publisher`]. + .set_provider_publication_interval(None); let mut kademlia = kad::Behaviour::with_config(pub_key.to_peer_id(), store, kad_config); for multiaddr in &config.bootstrap_peers { diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index ed0d09673..486ea151c 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -1,8 +1,10 @@ pub mod behaviour; pub mod config; mod keys; +mod metrics; mod node; mod providers; +mod publisher; pub mod rpc; mod sqliteblockstore; mod swarm; @@ -10,6 +12,7 @@ mod swarm; pub use self::behaviour::Event; pub use self::config::*; pub use self::keys::{DiskStorage, Keychain, MemoryStorage}; +pub use self::metrics::Metrics; pub use self::node::*; pub use iroh_rpc_types::{GossipsubEvent, GossipsubEventStream}; pub use libp2p::PeerId; diff --git a/p2p/src/metrics.rs b/p2p/src/metrics.rs new file mode 100644 index 000000000..7ef03f686 --- /dev/null +++ b/p2p/src/metrics.rs @@ -0,0 +1,61 @@ +use ceramic_metrics::Recorder; +use prometheus_client::{ + encoding::{EncodeLabelSet, EncodeLabelValue}, + metrics::{counter::Counter, family::Family}, + registry::Registry, +}; + +/// Metrics for Ceramic P2P events +#[derive(Clone)] +pub struct Metrics { + publish_results: Family, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +struct PublishResultsLabels { + result: PublishResult, +} + +impl From<&PublishResult> for PublishResultsLabels { + fn from(value: &PublishResult) -> Self { + Self { result: *value } + } +} + +#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] +pub enum PublishResult { + Success, + Failed, +} + +impl Metrics { + /// Register and construct Metrics + pub fn register(registry: &mut Registry) -> Self { + let sub_registry = registry.sub_registry_with_prefix("p2p"); + + let publish_results = Family::::default(); + sub_registry.register( + "publish_results", + "Number of provider records results", + publish_results.clone(), + ); + + Self { publish_results } + } +} + +pub enum Event { + PublishResult(PublishResult), +} + +impl Recorder> for Metrics { + fn record(&self, event: &Option) { + match event { + Some(Event::PublishResult(result)) => { + let labels = result.into(); + self.publish_results.get_or_create(&labels).inc(); + } + None => {} + } + } +} diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 822a23f5d..4b9609bb3 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -39,8 +39,6 @@ use tokio::sync::oneshot::{self, Sender as OneShotSender}; use tokio::task::JoinHandle; use tracing::{debug, error, info, trace, warn}; -use crate::keys::{Keychain, Storage}; -use crate::providers::Providers; use crate::rpc::{P2p, ProviderRequestKey}; use crate::swarm::build_swarm; use crate::GossipsubEvent; @@ -49,6 +47,11 @@ use crate::{ rpc::{self, RpcMessage}, Config, }; +use crate::{ + keys::{Keychain, Storage}, + publisher::Publisher, +}; +use crate::{metrics::Metrics, providers::Providers}; use recon::{libp2p::Recon, Sha256a}; #[allow(clippy::large_enum_variant)] @@ -81,6 +84,7 @@ where use_dht: bool, bitswap_sessions: BitswapSessions, providers: Providers, + publisher: Publisher, listen_addrs: Vec, trust_observed_addrs: bool, @@ -148,6 +152,7 @@ where keypair: Keypair, recons: Option<(I, M)>, sql_pool: SqlitePool, + metrics: Metrics, ) -> Result { let (network_sender_in, network_receiver_in) = channel(1024); // TODO: configurable @@ -168,7 +173,8 @@ where .await .context("failed to create rpc client")?; - let mut swarm = build_swarm(&libp2p_config, &keypair, recons, sql_pool).await?; + let block_store = crate::SQLiteBlockStore::new(sql_pool).await?; + let mut swarm = build_swarm(&libp2p_config, &keypair, recons, block_store.clone()).await?; info!("iroh-p2p peerid: {}", swarm.local_peer_id()); for addr in &libp2p_config.external_multiaddrs { @@ -180,6 +186,13 @@ where Swarm::listen_on(&mut swarm, addr.clone())?; listen_addrs.push(addr.clone()); } + let publisher = Publisher::new( + libp2p_config + .kademlia_provider_publication_interval + .unwrap_or_else(|| Duration::from_secs(12 * 60 * 60)), + block_store, + metrics, + ); Ok(Node { swarm, @@ -194,6 +207,7 @@ where use_dht: libp2p_config.kademlia, bitswap_sessions: Default::default(), providers: Providers::new(4), + publisher, listen_addrs, trust_observed_addrs: libp2p_config.trust_observed_addrs, failed_external_addresses: Default::default(), @@ -254,6 +268,17 @@ where } } } + provide_records = self.publisher.next() => { + if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() { + if let Some(provide_records) = provide_records { + for record in provide_records { + if let Err(err) = kad.start_providing(record.clone()) { + warn!(key=hex::encode(record.to_vec()), %err,"failed to provide record"); + } + } + } + } + } _ = async { if let Some(ref mut nice_interval) = nice_interval { nice_interval.tick().await @@ -515,10 +540,9 @@ where } = e { match result { - QueryResult::StartProviding(result) => match result { - Ok(_) => {} - Err(err) => warn!("kad: failed to provide record: {}", err), - }, + QueryResult::StartProviding(result) => { + self.publisher.handle_start_providing_result(result); + } QueryResult::GetProviders(Ok(p)) => { match p { GetProvidersOk::FoundProviders { key, providers } => { @@ -1343,12 +1367,14 @@ mod tests { // Using an in memory DB for the tests for realistic benchmark disk DB is needed. let sql_pool = SqlitePool::connect("sqlite::memory:").await?; + let metrics = Metrics::register(&mut prometheus_client::registry::Registry::default()); let mut p2p = Node::new( network_config, rpc_server_addr, keypair.into(), None::<(DummyRecon, DummyRecon)>, sql_pool, + metrics, ) .await?; let cfg = iroh_rpc_client::Config { diff --git a/p2p/src/publisher.rs b/p2p/src/publisher.rs new file mode 100644 index 000000000..0225b6c65 --- /dev/null +++ b/p2p/src/publisher.rs @@ -0,0 +1,274 @@ +use std::{ + collections::{HashMap, HashSet, VecDeque}, + pin::Pin, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use anyhow::Result; +use ceramic_metrics::Recorder; +use futures_timer::Delay; +use futures_util::{future::BoxFuture, Future, Stream}; +use libp2p::kad::{record::Key, AddProviderError, AddProviderOk}; +use multihash::Multihash; +use tokio::sync::oneshot; +use tracing::{debug, warn}; + +use crate::{ + metrics::{self, Metrics}, + SQLiteBlockStore, +}; + +// Performing many queries concurrently is efficient for adjacent keys. +// We can use a large number here in order take advantage of those efficiencies. +const MAX_RUNNING_QUERIES: usize = 1000; +// Number of historical durations to track. +// Each batch tends to last up to the timeout because of the long tail of queries. +// As such there is not much variance in the batch duration and a small history is sufficient. +const ELAPSED_HISTORY_SIZE: usize = 10; +// Retries are relatively cheap and missing a publish can have network wide negative effects. +const MAX_RETRIES: usize = 10; +/// Scale between 0 and 1 of how optimistic we are. +/// A value of 0 means no optimism and therefore never delay the publisher loop. +/// A value of 1 means perfect optimism and therefore delay for all of our estimated ability. +const OPTIMISM: f64 = 0.5; + +// Manages publishing provider records regularly over an interval. +pub struct Publisher { + metrics: Metrics, + state: State, + interval: Duration, + deadline: Instant, + current_queries: HashSet, + block_store: SQLiteBlockStore, + last_hash: Option, + batch_complete: Option>, + elapsed_history: VecDeque, + retries: HashMap, +} + +enum State { + StartingFetch, + Delaying(Delay), + FetchingBatch { + start: Instant, + future: BoxFuture<'static, Result>, + }, + WaitingOnBatch { + start: Instant, + remaining: i64, + rx: oneshot::Receiver<()>, + }, +} + +impl Publisher { + pub fn new(interval: Duration, block_store: SQLiteBlockStore, metrics: Metrics) -> Self { + Self { + metrics, + state: State::StartingFetch, + deadline: Instant::now() + interval, + interval, + current_queries: HashSet::new(), + block_store, + last_hash: None, + batch_complete: None, + elapsed_history: VecDeque::with_capacity(ELAPSED_HISTORY_SIZE), + retries: Default::default(), + } + } + pub fn handle_start_providing_result( + &mut self, + result: Result, + ) { + let (key, metric_event) = match result { + Ok(AddProviderOk { key }) => { + self.retries.remove(&key); + ( + key, + Some(metrics::Event::PublishResult( + metrics::PublishResult::Success, + )), + ) + } + Err(AddProviderError::Timeout { key }) => { + let metrics_event = if self.current_queries.contains(&key) { + let retry_count = self + .retries + .entry(key.clone()) + .and_modify(|count| *count += 1) + .or_insert(1); + if *retry_count > MAX_RETRIES { + warn!( + key = hex::encode(key.to_vec()), + retries_attempted = MAX_RETRIES, + "kad: failed to provide record: timeout" + ); + self.retries.remove(&key); + Some(metrics::Event::PublishResult( + metrics::PublishResult::Failed, + )) + } else { + None + } + } else { + None + }; + (key, metrics_event) + } + }; + self.metrics.record(&metric_event); + self.current_queries.remove(&key); + + if self.current_queries.is_empty() { + if let Some(tx) = self.batch_complete.take() { + let _ = tx.send(()); + } + } + } +} + +struct Batch { + hashes: Vec, + remaining: i64, +} + +impl Stream for Publisher { + type Item = Vec; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Loop until we reach a blocking state. + loop { + match &mut self.state { + State::Delaying(ref mut delay) => match Future::poll(Pin::new(delay), cx) { + Poll::Ready(_) => { + self.state = State::StartingFetch; + } + Poll::Pending => { + return Poll::Pending; + } + }, + State::StartingFetch => { + let block_store = self.block_store.clone(); + let last_hash = self.last_hash; + let limit = (MAX_RUNNING_QUERIES - self.retries.len()) as i64; + self.state = State::FetchingBatch { + start: Instant::now(), + future: Box::pin(async move { + let (hashes, remaining) = block_store.range(last_hash, limit).await?; + Ok(Batch { hashes, remaining }) + }), + }; + } + State::FetchingBatch { + start, + ref mut future, + } => { + match Future::poll(future.as_mut(), cx) { + Poll::Ready(Ok(batch)) => { + if batch.hashes.is_empty() { + let now = Instant::now(); + debug!( + deadline_seconds = self.deadline.duration_since(now).as_secs(), + "no more blocks, delaying until deadline" + ); + // We reached the end of the blocks. + // Delay until the deadline and reset it. + self.last_hash = None; + self.state = State::Delaying(Delay::new(self.deadline - now)); + self.deadline = now + self.interval; + } else { + let start = *start; + self.last_hash = batch.hashes.iter().last().copied(); + let (tx, rx) = oneshot::channel(); + self.batch_complete = Some(tx); + self.state = State::WaitingOnBatch { + start, + remaining: batch.remaining, + rx, + }; + + debug!( + new_count = batch.hashes.len(), + repeat_count = self.retries.len(), + max_retry_count = self.retries.values().max().unwrap_or(&0), + "starting new publish batch" + ); + // Collect any keys that need to be retried and any new keys + let keys: Vec = self + .retries + .keys() + .cloned() + .chain( + batch.hashes.into_iter().map(|hash| hash.to_bytes().into()), + ) + .collect(); + keys.iter().for_each(|key| { + self.current_queries.insert(key.clone()); + }); + return Poll::Ready(Some(keys)); + } + } + Poll::Ready(Err(err)) => { + warn!(%err,"failed to fetch next batch of blocks to publish"); + self.state = State::StartingFetch; + } + Poll::Pending => return Poll::Pending, + } + } + State::WaitingOnBatch { + start, + remaining, + rx, + } => { + match Future::poll(Pin::new(rx), cx) { + Poll::Ready(_) => { + let remaining_batches = + (*remaining as f64) / MAX_RUNNING_QUERIES as f64; + let elapsed = start.elapsed(); + self.elapsed_history.push_front(elapsed); + if self.elapsed_history.len() >= ELAPSED_HISTORY_SIZE { + self.elapsed_history.pop_back(); + } + let average = self + .elapsed_history + .iter() + .sum::() + .div_f64(self.elapsed_history.len() as f64); + let needed = average.mul_f64(remaining_batches); + let now = Instant::now(); + let estimated_finish = now + needed; + let spare = self.deadline.duration_since(estimated_finish); + + // Compute useful diagnostic values + let needed_seconds = needed.as_secs_f64(); + let deadline_seconds = self.deadline.duration_since(now).as_secs_f64(); + let batch_average_seconds = average.as_secs(); + let lag_ratio = needed_seconds / deadline_seconds; + + if !spare.is_zero() { + // Be conservative and adjust our delay based on our optimism of + // the estimate. + let delay = spare.div_f64(remaining_batches / OPTIMISM); + debug!( + batch_average_seconds, + lag_ratio, + delay_seconds = delay.as_secs(), + "spare time, delaying, lag_ratio is (estimated needed time) / (remaining time before deadline)" + ); + self.state = State::Delaying(Delay::new(delay)); + } else { + warn!( + batch_average_seconds, + lag_ratio, + "publisher has no spare time, lag_ratio is (estimated needed time) / (remaining time before deadline)" + ); + self.state = State::StartingFetch; + } + } + Poll::Pending => return Poll::Pending, + } + } + } + } + } +} diff --git a/p2p/src/sqliteblockstore.rs b/p2p/src/sqliteblockstore.rs index 684b15cc0..609419e17 100644 --- a/p2p/src/sqliteblockstore.rs +++ b/p2p/src/sqliteblockstore.rs @@ -7,6 +7,7 @@ use cid::{ Cid, }; use iroh_bitswap::{Block, Store}; +use multihash::Multihash; use sqlx::{Row, SqlitePool}; #[derive(Debug, Clone)] @@ -45,6 +46,42 @@ impl SQLiteBlockStore { .await?; Ok(()) } + /// Return a range of block hashes starting at hash exclusively. + pub async fn range( + &self, + hash: Option, + limit: i64, + ) -> Result<(Vec, i64)> { + let (hashes_query, remaining_query) = if let Some(hash) = hash { + ( + sqlx::query( + "SELECT multihash FROM blocks WHERE multihash > ? ORDER BY multihash LIMIT ?;", + ) + .bind(hash.to_bytes()) + .bind(limit), + sqlx::query("SELECT count(multihash) FROM blocks WHERE multihash > ?") + .bind(hash.to_bytes()), + ) + } else { + ( + sqlx::query("SELECT multihash FROM blocks ORDER BY multihash LIMIT ?;").bind(limit), + sqlx::query("SELECT count(multihash) FROM blocks;"), + ) + }; + let hashes = hashes_query + .fetch_all(&self.pool) + .await? + .into_iter() + .map(|row| Multihash::from_bytes(row.get::<'_, &[u8], _>(0))) + .collect::, multihash::Error>>()?; + let remaining = remaining_query + .fetch_one(&self.pool) + .await? + .get::<'_, i64, _>(0) + // Do not count the hashes we just got in the remaining count. + - (hashes.len() as i64); + Ok((hashes, remaining)) + } pub async fn get_size(&self, cid: Cid) -> Result> { Ok(Some( diff --git a/p2p/src/swarm.rs b/p2p/src/swarm.rs index a5818d138..6bee0fe23 100644 --- a/p2p/src/swarm.rs +++ b/p2p/src/swarm.rs @@ -13,9 +13,8 @@ use libp2p::{ }; use libp2p_identity::Keypair; use recon::{libp2p::Recon, Sha256a}; -use sqlx::SqlitePool; -use crate::{behaviour::NodeBehaviour, Libp2pConfig}; +use crate::{behaviour::NodeBehaviour, Libp2pConfig, SQLiteBlockStore}; /// Builds the transport stack that LibP2P will communicate over. async fn build_transport( @@ -106,7 +105,7 @@ pub(crate) async fn build_swarm( config: &Libp2pConfig, keypair: &Keypair, recons: Option<(I, M)>, - sql_pool: SqlitePool, + block_store: SQLiteBlockStore, ) -> Result>> where I: Recon, @@ -115,7 +114,7 @@ where let peer_id = keypair.public().to_peer_id(); let (transport, relay_client) = build_transport(keypair, config).await; - let behaviour = NodeBehaviour::new(keypair, config, relay_client, recons, sql_pool).await?; + let behaviour = NodeBehaviour::new(keypair, config, relay_client, recons, block_store).await?; let swarm_config = Config::with_tokio_executor() .with_notify_handler_buffer_size(config.notify_handler_buffer_size)