Skip to content

Commit

Permalink
feat: continously publish provider records (#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc authored Nov 14, 2023
1 parent 75e2137 commit 7e2d3f4
Show file tree
Hide file tree
Showing 11 changed files with 445 additions and 35 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,9 @@ impl Daemon {
};
let ipfs_metrics =
ceramic_metrics::MetricsHandle::register(ceramic_kubo_rpc::IpfsMetrics::register);
let p2p_metrics = ceramic_metrics::MetricsHandle::register(ceramic_p2p::Metrics::register);
let ipfs = Ipfs::builder()
.with_p2p(p2p_config, keypair, recons, sql_pool.clone())
.with_p2p(p2p_config, keypair, recons, sql_pool.clone(), p2p_metrics)
.await?
.build(sql_pool.clone(), ipfs_metrics)
.await?;
Expand Down
3 changes: 2 additions & 1 deletion one/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl Builder<Init> {
keypair: Keypair,
recons: Option<(I, M)>,
sql_pool: SqlitePool,
metrics: ceramic_p2p::Metrics,
) -> anyhow::Result<Builder<WithP2p>>
where
I: Recon<Key = Interest, Hash = Sha256a>,
Expand All @@ -51,7 +52,7 @@ impl Builder<Init> {

config.libp2p = libp2p_config;

let mut p2p = Node::new(config, addr.clone(), keypair, recons, sql_pool).await?;
let mut p2p = Node::new(config, addr.clone(), keypair, recons, sql_pool, metrics).await?;

let task = task::spawn(async move {
if let Err(err) = p2p.run().await {
Expand Down
20 changes: 11 additions & 9 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,31 @@ anyhow.workspace = true
async-stream.workspace = true
async-trait.workspace = true
bytes.workspace = true
ceramic-core.workspace = true
ceramic-metrics.workspace = true
chrono = "0.4.31"
cid.workspace = true
clap.workspace = true
clap-verbosity-flag = "2.0.1"
clap.workspace = true
clio = { version = "0.3.4", features = ["clap-parse"] }
dirs = "5.0.1"
expect-test.workspace = true
futures-timer = "3.0.2"
futures-util.workspace = true
futures.workspace = true
git-version.workspace = true
glob = "0.3.1"
hex.workspace = true
iroh-bitswap.workspace = true
ceramic-metrics.workspace = true
iroh-rpc-client.workspace = true
iroh-rpc-types.workspace = true
iroh-util.workspace = true
libp2p-identity.workspace = true
libp2p-mplex = "0.40.0"
libp2p-quic = { version = "0.9", features = ["tokio"] }
lru.workspace = true
multihash.workspace = true
prometheus-client.workspace = true
rand.workspace = true
recon.workspace = true
serde = { workspace = true, features = ["derive"] }
Expand All @@ -42,13 +51,6 @@ tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing.workspace = true
void.workspace = true
zeroize.workspace = true
ceramic-core.workspace = true
glob = "0.3.1"
hex.workspace = true
expect-test.workspace = true
dirs = "5.0.1"
chrono = "0.4.31"


[dependencies.libp2p]
workspace = true
Expand Down
29 changes: 16 additions & 13 deletions p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use libp2p::{
};
use libp2p_identity::Keypair;
use recon::{libp2p::Recon, Sha256a};
use sqlx::SqlitePool;
use tracing::{info, warn};

pub use self::event::Event;
Expand Down Expand Up @@ -101,7 +100,7 @@ where
config: &Libp2pConfig,
relay_client: Option<relay::client::Behaviour>,
recons: Option<(I, M)>,
sql_pool: SqlitePool,
block_store: SQLiteBlockStore,
) -> Result<Self> {
let peer_manager = PeerManager::default();
let pub_key = local_key.public();
Expand All @@ -115,8 +114,7 @@ where
} else {
BitswapConfig::default_client_mode()
};
let store: SQLiteBlockStore = SQLiteBlockStore::new(sql_pool).await?;
Some(Bitswap::<SQLiteBlockStore>::new(peer_id, store, bs_config).await)
Some(Bitswap::<SQLiteBlockStore>::new(peer_id, block_store, bs_config).await)
} else {
None
}
Expand All @@ -132,22 +130,27 @@ where

let kad = if config.kademlia {
info!("init kademlia");
// TODO: persist to store
let mem_store_config = MemoryStoreConfig {
// enough for >10gb of unixfs files at the default chunk size
max_records: 1024 * 64,
max_provided_keys: 1024 * 64,
// We do not store records.
max_records: 0,
max_provided_keys: 10_000_000,
max_providers_per_key: config.kademlia_replication_factor.into(),
..Default::default()
};
let store = MemoryStore::with_config(peer_id, mem_store_config);

let mut kad_config = kad::Config::default();
kad_config.set_replication_factor(config.kademlia_replication_factor);
kad_config.set_parallelism(config.kademlia_parallelism);
kad_config.set_query_timeout(config.kademlia_query_timeout);
kad_config.set_provider_record_ttl(config.kademlia_provider_record_ttl);
kad_config
.set_provider_publication_interval(config.kademlia_provider_publication_interval);
.set_replication_factor(config.kademlia_replication_factor)
.set_parallelism(config.kademlia_parallelism)
.set_query_timeout(config.kademlia_query_timeout)
.set_provider_record_ttl(config.kademlia_provider_record_ttl)
// Disable record (re)-replication and (re)-publication
.set_replication_interval(None)
.set_publication_interval(None)
// Disable provider record (re)-publication
// Provider records are re-published via the [`crate::publisher::Publisher`].
.set_provider_publication_interval(None);

let mut kademlia = kad::Behaviour::with_config(pub_key.to_peer_id(), store, kad_config);
for multiaddr in &config.bootstrap_peers {
Expand Down
3 changes: 3 additions & 0 deletions p2p/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
pub mod behaviour;
pub mod config;
mod keys;
mod metrics;
mod node;
mod providers;
mod publisher;
pub mod rpc;
mod sqliteblockstore;
mod swarm;

pub use self::behaviour::Event;
pub use self::config::*;
pub use self::keys::{DiskStorage, Keychain, MemoryStorage};
pub use self::metrics::Metrics;
pub use self::node::*;
pub use iroh_rpc_types::{GossipsubEvent, GossipsubEventStream};
pub use libp2p::PeerId;
Expand Down
61 changes: 61 additions & 0 deletions p2p/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use ceramic_metrics::Recorder;
use prometheus_client::{
encoding::{EncodeLabelSet, EncodeLabelValue},
metrics::{counter::Counter, family::Family},
registry::Registry,
};

/// Metrics for Ceramic P2P events
#[derive(Clone)]
pub struct Metrics {
publish_results: Family<PublishResultsLabels, Counter>,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct PublishResultsLabels {
result: PublishResult,
}

impl From<&PublishResult> for PublishResultsLabels {
fn from(value: &PublishResult) -> Self {
Self { result: *value }
}
}

#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
pub enum PublishResult {
Success,
Failed,
}

impl Metrics {
/// Register and construct Metrics
pub fn register(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("p2p");

let publish_results = Family::<PublishResultsLabels, Counter>::default();
sub_registry.register(
"publish_results",
"Number of provider records results",
publish_results.clone(),
);

Self { publish_results }
}
}

pub enum Event {
PublishResult(PublishResult),
}

impl Recorder<Option<Event>> for Metrics {
fn record(&self, event: &Option<Event>) {
match event {
Some(Event::PublishResult(result)) => {
let labels = result.into();
self.publish_results.get_or_create(&labels).inc();
}
None => {}
}
}
}
40 changes: 33 additions & 7 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ use tokio::sync::oneshot::{self, Sender as OneShotSender};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, trace, warn};

use crate::keys::{Keychain, Storage};
use crate::providers::Providers;
use crate::rpc::{P2p, ProviderRequestKey};
use crate::swarm::build_swarm;
use crate::GossipsubEvent;
Expand All @@ -49,6 +47,11 @@ use crate::{
rpc::{self, RpcMessage},
Config,
};
use crate::{
keys::{Keychain, Storage},
publisher::Publisher,
};
use crate::{metrics::Metrics, providers::Providers};
use recon::{libp2p::Recon, Sha256a};

#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -81,6 +84,7 @@ where
use_dht: bool,
bitswap_sessions: BitswapSessions,
providers: Providers,
publisher: Publisher,
listen_addrs: Vec<Multiaddr>,

trust_observed_addrs: bool,
Expand Down Expand Up @@ -148,6 +152,7 @@ where
keypair: Keypair,
recons: Option<(I, M)>,
sql_pool: SqlitePool,
metrics: Metrics,
) -> Result<Self> {
let (network_sender_in, network_receiver_in) = channel(1024); // TODO: configurable

Expand All @@ -168,7 +173,8 @@ where
.await
.context("failed to create rpc client")?;

let mut swarm = build_swarm(&libp2p_config, &keypair, recons, sql_pool).await?;
let block_store = crate::SQLiteBlockStore::new(sql_pool).await?;
let mut swarm = build_swarm(&libp2p_config, &keypair, recons, block_store.clone()).await?;
info!("iroh-p2p peerid: {}", swarm.local_peer_id());

for addr in &libp2p_config.external_multiaddrs {
Expand All @@ -180,6 +186,13 @@ where
Swarm::listen_on(&mut swarm, addr.clone())?;
listen_addrs.push(addr.clone());
}
let publisher = Publisher::new(
libp2p_config
.kademlia_provider_publication_interval
.unwrap_or_else(|| Duration::from_secs(12 * 60 * 60)),
block_store,
metrics,
);

Ok(Node {
swarm,
Expand All @@ -194,6 +207,7 @@ where
use_dht: libp2p_config.kademlia,
bitswap_sessions: Default::default(),
providers: Providers::new(4),
publisher,
listen_addrs,
trust_observed_addrs: libp2p_config.trust_observed_addrs,
failed_external_addresses: Default::default(),
Expand Down Expand Up @@ -254,6 +268,17 @@ where
}
}
}
provide_records = self.publisher.next() => {
if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() {
if let Some(provide_records) = provide_records {
for record in provide_records {
if let Err(err) = kad.start_providing(record.clone()) {
warn!(key=hex::encode(record.to_vec()), %err,"failed to provide record");
}
}
}
}
}
_ = async {
if let Some(ref mut nice_interval) = nice_interval {
nice_interval.tick().await
Expand Down Expand Up @@ -515,10 +540,9 @@ where
} = e
{
match result {
QueryResult::StartProviding(result) => match result {
Ok(_) => {}
Err(err) => warn!("kad: failed to provide record: {}", err),
},
QueryResult::StartProviding(result) => {
self.publisher.handle_start_providing_result(result);
}
QueryResult::GetProviders(Ok(p)) => {
match p {
GetProvidersOk::FoundProviders { key, providers } => {
Expand Down Expand Up @@ -1343,12 +1367,14 @@ mod tests {
// Using an in memory DB for the tests for realistic benchmark disk DB is needed.
let sql_pool = SqlitePool::connect("sqlite::memory:").await?;

let metrics = Metrics::register(&mut prometheus_client::registry::Registry::default());
let mut p2p = Node::new(
network_config,
rpc_server_addr,
keypair.into(),
None::<(DummyRecon<Interest>, DummyRecon<EventId>)>,
sql_pool,
metrics,
)
.await?;
let cfg = iroh_rpc_client::Config {
Expand Down
Loading

0 comments on commit 7e2d3f4

Please sign in to comment.