Skip to content

Commit

Permalink
feat: add peering support
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Nov 20, 2023
1 parent 2e81450 commit 8ad0ab9
Show file tree
Hide file tree
Showing 14 changed files with 847 additions and 478 deletions.
1,047 changes: 601 additions & 446 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion beetle/iroh-bitswap/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl<S: Store> Client<S> {
inc!(BitswapMetrics::BlocksIn);
record!(
BitswapMetrics::ReceivedBlockBytes,
block.data().len() as u64
block.data().len() as i64
);
}
}
Expand Down
6 changes: 3 additions & 3 deletions beetle/iroh-bitswap/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl Session {
loop_state.want_blocks(keys).await;
},
Ok(Op::Cancel(keys)) => {
record!(BitswapMetrics::CancelBlocks, keys.len() as u64);
record!(BitswapMetrics::CancelBlocks, keys.len() as i64);
loop_state.session_wants.cancel_pending(&keys);
loop_state.session_want_sender.cancel(keys).await;
}
Expand Down Expand Up @@ -567,7 +567,7 @@ impl LoopState {
if wanted.is_empty() {
return;
}
record!(BitswapMetrics::WantedBlocksReceived, wanted.len() as u64);
record!(BitswapMetrics::WantedBlocksReceived, wanted.len() as i64);

// Record latency
self.latency_tracker
Expand All @@ -588,7 +588,7 @@ impl LoopState {

/// Called when blocks are requested by the client.
async fn want_blocks(&mut self, new_keys: Vec<Cid>) {
record!(BitswapMetrics::WantedBlocks, new_keys.len() as u64);
record!(BitswapMetrics::WantedBlocks, new_keys.len() as i64);
if !new_keys.is_empty() {
// Inform the SessionInterestManager that this session is interested in the keys.
self.session_interest_manager
Expand Down
2 changes: 1 addition & 1 deletion beetle/iroh-bitswap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl<S: Store> Bitswap<S> {

fn receive_message(&self, peer: PeerId, message: BitswapMessage) {
inc!(BitswapMetrics::MessagesReceived);
record!(BitswapMetrics::MessageBytesIn, message.encoded_len() as u64);
record!(BitswapMetrics::MessageBytesIn, message.encoded_len() as i64);
// TODO: Handle backpressure properly
if let Err(err) = self.incoming_messages.try_send((peer, message)) {
warn!(
Expand Down
4 changes: 2 additions & 2 deletions beetle/iroh-bitswap/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Network {
inc!(BitswapMetrics::MessagesAttempted);

let num_blocks = message.blocks().count();
let num_block_bytes = message.blocks().map(|b| b.data.len() as u64).sum();
let num_block_bytes = message.blocks().map(|b| b.data.len() as i64).sum();

tokio::time::timeout(timeout, async {
let mut errors: Vec<anyhow::Error> = Vec::new();
Expand All @@ -125,7 +125,7 @@ impl Network {
let (s, r) = oneshot::channel();
record!(
BitswapMetrics::MessageBytesOut,
message.clone().encoded_len() as u64
message.clone().encoded_len() as i64
);
self.network_out_sender
.send(OutEvent::SendMessage {
Expand Down
4 changes: 2 additions & 2 deletions beetle/iroh-bitswap/src/server/decision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ impl<S: Store> Engine<S> {

if *counter % 100 == 0 {
let stats = self.peer_task_queue.stats().await;
record!(BitswapMetrics::EnginePendingTasks, stats.num_pending as u64);
record!(BitswapMetrics::EngineActiveTasks, stats.num_active as u64);
record!(BitswapMetrics::EnginePendingTasks, stats.num_pending as i64);
record!(BitswapMetrics::EngineActiveTasks, stats.num_active as i64);
}
}

Expand Down
4 changes: 2 additions & 2 deletions metrics/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub trait HistogramType {
#[deprecated = "use ceramic_metrics::Recorder instead"]
#[allow(deprecated)]
pub trait MetricsRecorder {
fn record<M>(&self, m: M, value: u64)
fn record<M>(&self, m: M, value: i64)
where
M: MetricType + std::fmt::Display;
fn observe<M>(&self, m: M, value: f64)
Expand All @@ -114,7 +114,7 @@ pub trait MetricsRecorder {
/// New metrics systems should use that trait.
#[deprecated = "use ceramic_metrics::Recorder instead"]
pub trait MRecorder {
fn record(&self, value: u64);
fn record(&self, value: i64);
}

/// This trait is inefficient as we do string comparisons to find the right metrics to update.
Expand Down
2 changes: 1 addition & 1 deletion metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub enum Collector {
}

#[allow(unused_variables, unreachable_patterns)]
pub fn record<M>(c: Collector, m: M, v: u64)
pub fn record<M>(c: Collector, m: M, v: i64)
where
M: MetricType + std::fmt::Display,
{
Expand Down
12 changes: 10 additions & 2 deletions metrics/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ macro_rules! inc {
};
}

#[macro_export]
macro_rules! dec {
( $e:expr) => {
#[allow(deprecated)]
$e.record(-1);
};
}

#[macro_export]
macro_rules! observe {
( $e:expr, $v:expr) => {
Expand Down Expand Up @@ -80,7 +88,7 @@ macro_rules! make_metrics {
}

impl MetricsRecorder for Metrics {
fn record<M>(&self, m: M, value: u64)
fn record<M>(&self, m: M, value: i64)
where
M: MetricType + std::fmt::Display,
{
Expand Down Expand Up @@ -120,7 +128,7 @@ macro_rules! make_metrics {
}

impl MRecorder for [<$module_name Metrics>] {
fn record(&self, value: u64) {
fn record(&self, value: i64) {
$crate::record(Collector::$module_name, self.clone(), value);
}
}
Expand Down
40 changes: 30 additions & 10 deletions metrics/src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::fmt;

use prometheus_client::{metrics::counter::Counter, registry::Registry};
use prometheus_client::{
metrics::{counter::Counter, gauge::Gauge},
registry::Registry,
};
use tracing::error;

use crate::{
Expand All @@ -14,6 +17,7 @@ pub(crate) type Libp2pMetrics = libp2p::metrics::Metrics;
pub(crate) struct Metrics {
bad_peers: Counter,
bad_peers_removed: Counter,
bootstrap_peers_connected: Gauge,
skipped_peer_bitswap: Counter,
skipped_peer_kad: Counter,
loops: Counter,
Expand All @@ -38,6 +42,13 @@ impl Metrics {
bad_peers_removed.clone(),
);

let bootstrap_peers_connected = Gauge::default();
sub_registry.register(
P2PMetrics::BootstrapPeersConnected.name(),
"",
bootstrap_peers_connected.clone(),
);

let skipped_peer_bitswap = Counter::default();
sub_registry.register(
P2PMetrics::SkippedPeerBitswap.name(),
Expand All @@ -57,6 +68,7 @@ impl Metrics {
Self {
bad_peers,
bad_peers_removed,
bootstrap_peers_connected,
skipped_peer_bitswap,
skipped_peer_kad,
loops,
Expand All @@ -65,37 +77,44 @@ impl Metrics {
}

impl MetricsRecorder for Metrics {
fn record<M>(&self, m: M, value: u64)
fn record<M>(&self, m: M, value: i64)
where
M: MetricType + std::fmt::Display,
{
// Counters should never record negative values
if m.name() == P2PMetrics::BadPeer.name() {
self.bad_peers.inc_by(value);
self.bad_peers.inc_by(value.unsigned_abs());
} else if m.name() == P2PMetrics::BadPeerRemoved.name() {
self.bad_peers_removed.inc_by(value.unsigned_abs());
} else if m.name() == P2PMetrics::BootstrapPeersConnected.name() {
// This is a gauge and can record negative values
self.bootstrap_peers_connected.inc_by(value);
} else if m.name() == P2PMetrics::BadPeerRemoved.name() {
self.bad_peers_removed.inc_by(value);
self.bad_peers_removed.inc_by(value.unsigned_abs());
} else if m.name() == P2PMetrics::SkippedPeerBitswap.name() {
self.skipped_peer_bitswap.inc_by(value);
self.skipped_peer_bitswap.inc_by(value.unsigned_abs());
} else if m.name() == P2PMetrics::SkippedPeerKad.name() {
self.skipped_peer_kad.inc_by(value);
self.skipped_peer_kad.inc_by(value.unsigned_abs());
} else if m.name() == P2PMetrics::LoopCounter.name() {
self.loops.inc_by(value);
self.loops.inc_by(value.unsigned_abs());
} else {
error!("record (bitswap): unknown metric {}", m.name());
error!("record (p2p): unknown metric {}", m.name());
}
}

fn observe<M>(&self, m: M, _value: f64)
where
M: HistogramType + std::fmt::Display,
{
error!("observe (bitswap): unknown metric {}", m.name());
error!("observe (p2p): unknown metric {}", m.name());
}
}

#[derive(Clone, Debug)]
pub enum P2PMetrics {
BadPeer,
BadPeerRemoved,
BootstrapPeersConnected,
SkippedPeerBitswap,
SkippedPeerKad,
LoopCounter,
Expand All @@ -106,6 +125,7 @@ impl MetricType for P2PMetrics {
match self {
P2PMetrics::BadPeer => "bad_peer",
P2PMetrics::BadPeerRemoved => "bad_peer_removed",
P2PMetrics::BootstrapPeersConnected => "bootstrap_peers_connected",
P2PMetrics::SkippedPeerBitswap => "skipped_peer_bitswap",
P2PMetrics::SkippedPeerKad => "skipped_peer_kad",
P2PMetrics::LoopCounter => "loop_counter",
Expand All @@ -114,7 +134,7 @@ impl MetricType for P2PMetrics {
}

impl MRecorder for P2PMetrics {
fn record(&self, value: u64) {
fn record(&self, value: i64) {
crate::record(Collector::P2P, self.clone(), value);
}
}
Expand Down
21 changes: 20 additions & 1 deletion one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ struct DaemonOpts {
)]
swarm_addresses: Vec<String>,

/// Address of bootstrap peers.
/// There are no default addresses, use this arg or the API to connect to bootstrap peers as needed.
#[arg(
long,
use_value_delimiter = true,
value_delimiter = ',',
env = "CERAMIC_ONE_BOOTSTRAP_ADDRESSES"
)]
bootstrap_addresses: Vec<String>,

/// Path to storage directory
#[arg(short, long, env = "CERAMIC_ONE_STORE_DIR")]
store_dir: Option<PathBuf>,
Expand Down Expand Up @@ -371,7 +381,16 @@ impl Daemon {
max_conns_pending_in: opts.max_conns_pending_in,
max_conns_per_peer: opts.max_conns_per_peer,
idle_connection_timeout: Duration::from_millis(opts.idle_conns_timeout_ms),
bootstrap_peers: opts.network.bootstrap_addresses(),
// Add injected bootstrap addresses to the list of official bootstrap nodes, so that our bootstrap nodes are
// always included.
bootstrap_peers: [
opts.network.bootstrap_addresses(),
opts.bootstrap_addresses
.iter()
.map(|addr| addr.parse())
.collect::<Result<Vec<Multiaddr>, multiaddr::Error>>()?,
]
.concat(),
listening_multiaddrs: opts
.swarm_addresses
.iter()
Expand Down
1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ahash.workspace = true
anyhow.workspace = true
async-stream.workspace = true
async-trait.workspace = true
backoff.workspace = true
bytes.workspace = true
ceramic-core.workspace = true
ceramic-metrics.workspace = true
Expand Down
3 changes: 1 addition & 2 deletions p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ where
recons: Option<(I, M)>,
block_store: SQLiteBlockStore,
) -> Result<Self> {
let peer_manager = PeerManager::default();
let pub_key = local_key.public();
let peer_id = pub_key.to_peer_id();

Expand Down Expand Up @@ -252,7 +251,7 @@ where
dcutr: dcutr.into(),
relay_client: relay_client.into(),
gossipsub,
peer_manager,
peer_manager: PeerManager::new(&config.bootstrap_peers),
limits,
recon: recon.into(),
})
Expand Down
Loading

0 comments on commit 8ad0ab9

Please sign in to comment.