-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: continuously publish provider records #175
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
use ceramic_metrics::Recorder; | ||
use prometheus_client::{ | ||
encoding::{EncodeLabelSet, EncodeLabelValue}, | ||
metrics::{counter::Counter, family::Family}, | ||
registry::Registry, | ||
}; | ||
|
||
/// Metrics for Ceramic P2P events | ||
#[derive(Clone)] | ||
pub struct Metrics { | ||
publish_results: Family<PublishResultsLabels, Counter>, | ||
} | ||
|
||
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] | ||
struct PublishResultsLabels { | ||
result: PublishResult, | ||
} | ||
|
||
impl From<&PublishResult> for PublishResultsLabels { | ||
fn from(value: &PublishResult) -> Self { | ||
Self { result: *value } | ||
} | ||
} | ||
|
||
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] | ||
pub enum PublishResult { | ||
Success, | ||
Failed, | ||
} | ||
|
||
impl Metrics { | ||
/// Register and construct Metrics | ||
pub fn register(registry: &mut Registry) -> Self { | ||
let sub_registry = registry.sub_registry_with_prefix("p2p"); | ||
|
||
let publish_results = Family::<PublishResultsLabels, Counter>::default(); | ||
sub_registry.register( | ||
"publish_results", | ||
"Number of provider records results", | ||
publish_results.clone(), | ||
); | ||
|
||
Self { publish_results } | ||
} | ||
} | ||
|
||
pub enum Event { | ||
PublishResult(PublishResult), | ||
} | ||
|
||
impl Recorder<Option<Event>> for Metrics { | ||
fn record(&self, event: &Option<Event>) { | ||
match event { | ||
Some(Event::PublishResult(result)) => { | ||
let labels = result.into(); | ||
self.publish_results.get_or_create(&labels).inc(); | ||
} | ||
None => {} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,8 +39,6 @@ use tokio::sync::oneshot::{self, Sender as OneShotSender}; | |
use tokio::task::JoinHandle; | ||
use tracing::{debug, error, info, trace, warn}; | ||
|
||
use crate::keys::{Keychain, Storage}; | ||
use crate::providers::Providers; | ||
use crate::rpc::{P2p, ProviderRequestKey}; | ||
use crate::swarm::build_swarm; | ||
use crate::GossipsubEvent; | ||
|
@@ -49,6 +47,11 @@ use crate::{ | |
rpc::{self, RpcMessage}, | ||
Config, | ||
}; | ||
use crate::{ | ||
keys::{Keychain, Storage}, | ||
publisher::Publisher, | ||
}; | ||
use crate::{metrics::Metrics, providers::Providers}; | ||
use recon::{libp2p::Recon, Sha256a}; | ||
|
||
#[allow(clippy::large_enum_variant)] | ||
|
@@ -81,6 +84,7 @@ where | |
use_dht: bool, | ||
bitswap_sessions: BitswapSessions, | ||
providers: Providers, | ||
publisher: Publisher, | ||
listen_addrs: Vec<Multiaddr>, | ||
|
||
trust_observed_addrs: bool, | ||
|
@@ -148,6 +152,7 @@ where | |
keypair: Keypair, | ||
recons: Option<(I, M)>, | ||
sql_pool: SqlitePool, | ||
metrics: Metrics, | ||
) -> Result<Self> { | ||
let (network_sender_in, network_receiver_in) = channel(1024); // TODO: configurable | ||
|
||
|
@@ -168,7 +173,8 @@ where | |
.await | ||
.context("failed to create rpc client")?; | ||
|
||
let mut swarm = build_swarm(&libp2p_config, &keypair, recons, sql_pool).await?; | ||
let block_store = crate::SQLiteBlockStore::new(sql_pool).await?; | ||
let mut swarm = build_swarm(&libp2p_config, &keypair, recons, block_store.clone()).await?; | ||
info!("iroh-p2p peerid: {}", swarm.local_peer_id()); | ||
|
||
for addr in &libp2p_config.external_multiaddrs { | ||
|
@@ -180,6 +186,13 @@ where | |
Swarm::listen_on(&mut swarm, addr.clone())?; | ||
listen_addrs.push(addr.clone()); | ||
} | ||
let publisher = Publisher::new( | ||
libp2p_config | ||
.kademlia_provider_publication_interval | ||
.unwrap_or_else(|| Duration::from_secs(12 * 60 * 60)), | ||
block_store, | ||
metrics, | ||
); | ||
|
||
Ok(Node { | ||
swarm, | ||
|
@@ -194,6 +207,7 @@ where | |
use_dht: libp2p_config.kademlia, | ||
bitswap_sessions: Default::default(), | ||
providers: Providers::new(4), | ||
publisher, | ||
listen_addrs, | ||
trust_observed_addrs: libp2p_config.trust_observed_addrs, | ||
failed_external_addresses: Default::default(), | ||
|
@@ -254,6 +268,17 @@ where | |
} | ||
} | ||
} | ||
provide_records = self.publisher.next() => { | ||
if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() { | ||
if let Some(provide_records) = provide_records { | ||
for record in provide_records { | ||
if let Err(err) = kad.start_providing(record.clone()) { | ||
warn!(key=hex::encode(record.to_vec()), %err,"failed to provide record"); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
_ = async { | ||
if let Some(ref mut nice_interval) = nice_interval { | ||
nice_interval.tick().await | ||
|
@@ -515,10 +540,9 @@ where | |
} = e | ||
{ | ||
match result { | ||
QueryResult::StartProviding(result) => match result { | ||
Ok(_) => {} | ||
Err(err) => warn!("kad: failed to provide record: {}", err), | ||
}, | ||
QueryResult::StartProviding(result) => { | ||
self.publisher.handle_start_providing_result(result); | ||
} | ||
QueryResult::GetProviders(Ok(p)) => { | ||
match p { | ||
GetProvidersOk::FoundProviders { key, providers } => { | ||
|
@@ -1343,12 +1367,14 @@ mod tests { | |
// Using an in memory DB for the tests for realistic benchmark disk DB is needed. | ||
let sql_pool = SqlitePool::connect("sqlite::memory:").await?; | ||
|
||
let metrics = Metrics::register(&mut prometheus_client::registry::Registry::default()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this recording metrics in tests? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is because I didn't abstract over optional metrics in the code. The types require a Metrics instance in order to be constructed. So we do not validate metrics in tests yet, but this is how we could. |
||
let mut p2p = Node::new( | ||
network_config, | ||
rpc_server_addr, | ||
keypair.into(), | ||
None::<(DummyRecon<Interest>, DummyRecon<EventId>)>, | ||
sql_pool, | ||
metrics, | ||
) | ||
.await?; | ||
let cfg = iroh_rpc_client::Config { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should reference that we are doing this somewhere else. This looks like we are not publishing provider records.