Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(libp2p): track bandwidth per transport protocol stack #4727

Merged
merged 29 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9c940fa
feat(libp2p): track bandwidth per transport protocol stack
mxinden Oct 25, 2023
e1b4497
Move bandwidth logging to misc/metrics
mxinden Oct 27, 2023
84f68e7
Add bandwidth metrics to server
mxinden Oct 27, 2023
c3e785d
Rename bandwidth metric
mxinden Oct 27, 2023
76cbf19
Use Counter instead of Collector
mxinden Oct 28, 2023
f378cc7
Introduce with_bandwidth_metrics builder step
mxinden Oct 28, 2023
8cc2b1b
Remove collector
mxinden Oct 28, 2023
67fb7cd
Remove license header
mxinden Oct 30, 2023
fd7c5d9
Fix metrics example
mxinden Oct 30, 2023
f9fd560
Depend on StreamMuxer and not StreamMuxerBox
mxinden Oct 30, 2023
fe46696
Adjust doc comment
mxinden Oct 30, 2023
60c6e7a
Use subregistry
mxinden Oct 30, 2023
cda371d
Fix dead-lock when cloning metrics
mxinden Oct 31, 2023
68414d5
Box closure wrapping muxer
mxinden Nov 1, 2023
689a947
Fix wasm compilation
mxinden Nov 1, 2023
aa0fa95
Use protocol stack
mxinden Nov 1, 2023
77a427b
Minor changes
mxinden Nov 1, 2023
314bf0f
Expose through BandwidthMetricTransport
mxinden Nov 1, 2023
297130a
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into b…
mxinden Nov 1, 2023
78e564b
Add changelog entry
mxinden Nov 1, 2023
3276d96
Bump misc/server version
mxinden Nov 1, 2023
7641531
Deprecate BandwidthLogging
mxinden Nov 1, 2023
190492b
fmt
mxinden Nov 1, 2023
bd83ba9
Update misc/server/CHANGELOG.md
mxinden Nov 4, 2023
164c812
Rename to BandwidthTransport
mxinden Nov 4, 2023
756241d
Allow deprecated at the top of module
mxinden Nov 4, 2023
b01f319
fmt
mxinden Nov 10, 2023
d9b6d7d
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into b…
mxinden Nov 10, 2023
f4c1ec7
Fix changelog and version
mxinden Nov 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion examples/metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ mod http_service;
fn main() -> Result<(), Box<dyn Error>> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();

let mut swarm = libp2p::SwarmBuilder::with_new_identity()
let (builder, bandwidth_logging) = libp2p::SwarmBuilder::with_new_identity()
.with_async_std()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_bandwidth_logging();
let mut swarm = builder
.with_behaviour(|key| Behaviour::new(key.public()))?
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)))
.build();
Expand All @@ -58,6 +60,7 @@ fn main() -> Result<(), Box<dyn Error>> {

let mut metric_registry = Registry::default();
let metrics = Metrics::new(&mut metric_registry);
libp2p::bandwidth::register_bandwidth_sinks(&mut metric_registry, bandwidth_logging);
mxinden marked this conversation as resolved.
Show resolved Hide resolved
thread::spawn(move || block_on(http_service::metrics_server(metric_registry)));

block_on(async {
Expand Down
1 change: 1 addition & 0 deletions libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ getrandom = "0.2.3" # Explicit dependency to be used in `wasm-bindgen` featu
instant = "0.1.12" # Explicit dependency to be used in `wasm-bindgen` feature
# TODO feature flag?
rw-stream-sink = { workspace = true }
prometheus-client = { workspace = true }

libp2p-allow-block-list = { workspace = true }
libp2p-autonat = { workspace = true, optional = true }
Expand Down
37 changes: 36 additions & 1 deletion libp2p/src/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@
prelude::*,
ready,
};
use prometheus_client::{
encoding::{DescriptorEncoder, EncodeMetric},
metrics::{counter::ConstCounter, MetricType},
};
use std::{
collections::HashMap,
convert::TryFrom as _,
io,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Arc, RwLock,
},
task::{Context, Poll},
};
Expand Down Expand Up @@ -101,6 +106,7 @@
}

/// Allows obtaining the average bandwidth of the streams.
#[derive(Default, Debug)]
pub struct BandwidthSinks {
inbound: AtomicU64,
outbound: AtomicU64,
Expand All @@ -108,7 +114,7 @@

impl BandwidthSinks {
/// Returns a new [`BandwidthSinks`].
pub(crate) fn new() -> Arc<Self> {

Check failure on line 117 in libp2p/src/bandwidth.rs

View workflow job for this annotation

GitHub Actions / examples

associated function `new` is never used

Check failure on line 117 in libp2p/src/bandwidth.rs

View workflow job for this annotation

GitHub Actions / Compile with select features (mdns tcp dns async-std)

associated function `new` is never used

Check failure on line 117 in libp2p/src/bandwidth.rs

View workflow job for this annotation

GitHub Actions / IPFS Integration tests

associated function `new` is never used

Check failure on line 117 in libp2p/src/bandwidth.rs

View workflow job for this annotation

GitHub Actions / Test libp2p-perf

associated function `new` is never used
Arc::new(Self {
inbound: AtomicU64::new(0),
outbound: AtomicU64::new(0),
Expand Down Expand Up @@ -209,3 +215,32 @@
this.inner.poll_close(cx)
}
}

// TODO: Ideally this should go somewhere else. I.e. good to not depend on prometheus-client in libp2p.
pub fn register_bandwidth_sinks(
registry: &mut prometheus_client::registry::Registry,
sinks: Arc<RwLock<HashMap<String, Arc<BandwidthSinks>>>>,
) {
registry.register_collector(Box::new(SinksCollector(sinks)));
}

#[derive(Debug)]
struct SinksCollector(Arc<RwLock<HashMap<String, Arc<BandwidthSinks>>>>);

impl prometheus_client::collector::Collector for SinksCollector {
fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> {
let mut family_encoder =
encoder.encode_descriptor("bandwidth", "todo", None, MetricType::Counter)?;
mxinden marked this conversation as resolved.
Show resolved Hide resolved
for (protocols, sink) in self.0.read().expect("todo").iter() {
let labels = [("protocols", protocols.as_str()), ("direction", "inbound")];
let metric_encoder = family_encoder.encode_family(&labels)?;
ConstCounter::new(sink.inbound.load(Ordering::Relaxed)).encode(metric_encoder)?;

let labels = [("protocols", protocols.as_str()), ("direction", "outbound")];
let metric_encoder = family_encoder.encode_family(&labels)?;
ConstCounter::new(sink.outbound.load(Ordering::Relaxed)).encode(metric_encoder)?;
}

Ok(())
}
}
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 3 additions & 2 deletions libp2p/src/builder/phase/bandwidth_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use super::*;
use crate::bandwidth::BandwidthSinks;
use crate::transport_ext::TransportExt;
use crate::SwarmBuilder;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

pub struct BandwidthLoggingPhase<T, R> {
pub(crate) relay_behaviour: R,
Expand All @@ -17,7 +18,7 @@ impl<T: AuthenticatedMultiplexedTransport, Provider, R>
self,
) -> (
SwarmBuilder<Provider, BehaviourPhase<impl AuthenticatedMultiplexedTransport, R>>,
Arc<BandwidthSinks>,
Arc<RwLock<HashMap<String, Arc<BandwidthSinks>>>>,
) {
let (transport, sinks) = self.phase.transport.with_bandwidth_logging();
(
Expand Down
5 changes: 3 additions & 2 deletions libp2p/src/builder/phase/other_transport.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::convert::Infallible;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade};
use libp2p_core::Transport;
Expand Down Expand Up @@ -153,7 +154,7 @@ impl<Provider, T: AuthenticatedMultiplexedTransport>
Provider,
BehaviourPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
>,
Arc<BandwidthSinks>,
Arc<RwLock<HashMap<String, Arc<BandwidthSinks>>>>,
) {
self.without_any_other_transports()
.without_dns()
Expand Down
7 changes: 5 additions & 2 deletions libp2p/src/builder/phase/quic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use crate::bandwidth::BandwidthSinks;
use crate::SwarmBuilder;
#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
use libp2p_core::muxing::StreamMuxer;
Expand All @@ -8,7 +9,9 @@ use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade};
all(not(target_arch = "wasm32"), feature = "websocket")
))]
use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo};
use std::{marker::PhantomData, sync::Arc};
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::{Arc, RwLock};

pub struct QuicPhase<T> {
pub(crate) transport: T,
Expand Down Expand Up @@ -254,7 +257,7 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Quic
Provider,
BehaviourPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
>,
Arc<crate::bandwidth::BandwidthSinks>,
Arc<RwLock<HashMap<String, Arc<BandwidthSinks>>>>,
) {
self.without_quic()
.without_any_other_transports()
Expand Down
38 changes: 33 additions & 5 deletions libp2p/src/transport_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
Transport,
};
use libp2p_identity::PeerId;
use std::sync::Arc;
use multiaddr::Multiaddr;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};

/// Trait automatically implemented on all objects that implement `Transport`. Provides some
/// additional utilities.
Expand Down Expand Up @@ -66,7 +70,12 @@
///
/// let (transport, sinks) = transport.with_bandwidth_logging();
/// ```
fn with_bandwidth_logging<S>(self) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc<BandwidthSinks>)
fn with_bandwidth_logging<S>(
self,
) -> (
Boxed<(PeerId, StreamMuxerBox)>,
Arc<RwLock<HashMap<String, Arc<BandwidthSinks>>>>,
)
where
Self: Sized + Send + Unpin + 'static,
Self::Dial: Send + 'static,
Expand All @@ -74,16 +83,35 @@
Self::Error: Send + Sync,
Self::Output: Into<(PeerId, S)>,
S: StreamMuxer + Send + 'static,
S::Substream: Send + 'static,

Check failure on line 86 in libp2p/src/transport_ext.rs

View workflow job for this annotation

GitHub Actions / rustfmt

Diff in /home/runner/work/rust-libp2p/rust-libp2p/libp2p/src/transport_ext.rs
S::Error: Send + Sync + 'static,
{
let sinks = BandwidthSinks::new();
let sinks: Arc<RwLock<HashMap<_, Arc<BandwidthSinks>>>> = Arc::new(RwLock::new(HashMap::new()));
let sinks_copy = sinks.clone();
let transport = Transport::map(self, |output, _| {
let transport = Transport::map(self, move |output, connected_point| {
fn as_string(ma: &Multiaddr) -> String {
let len = ma
.protocol_stack()
.fold(0, |acc, proto| acc + proto.len() + 1);
let mut protocols = String::with_capacity(len);
for proto_tag in ma.protocol_stack() {
protocols.push('/');
protocols.push_str(proto_tag);
}
protocols
}

let sink = sinks_copy
.write()
.expect("todo")
.entry(as_string(connected_point.get_remote_address()))
.or_default()
.clone();

let (peer_id, stream_muxer_box) = output.into();
(
peer_id,
StreamMuxerBox::new(BandwidthLogging::new(stream_muxer_box, sinks_copy)),
StreamMuxerBox::new(BandwidthLogging::new(stream_muxer_box, sink)),
)
})
.boxed();
Expand Down
2 changes: 2 additions & 0 deletions misc/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
mod ping;
mod protocol_stack;
#[cfg(feature = "relay")]
mod relay;

Check failure on line 42 in misc/metrics/src/lib.rs

View workflow job for this annotation

GitHub Actions / rustfmt

Diff in /home/runner/work/rust-libp2p/rust-libp2p/misc/metrics/src/lib.rs
mod swarm;

use std::{sync::{Arc, RwLock}, collections::HashMap};

Check failure on line 45 in misc/metrics/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check rustdoc intra-doc links

unused imports: `Arc`, `RwLock`, `collections::HashMap`

Check failure on line 45 in misc/metrics/src/lib.rs

View workflow job for this annotation

GitHub Actions / clippy (nightly-2023-09-10)

unused imports: `Arc`, `RwLock`, `collections::HashMap`

Check failure on line 45 in misc/metrics/src/lib.rs

View workflow job for this annotation

GitHub Actions / Compile on wasm32-unknown-unknown

unused imports: `Arc`, `RwLock`, `collections::HashMap`

Check failure on line 45 in misc/metrics/src/lib.rs

View workflow job for this annotation

GitHub Actions / Compile on wasm32-unknown-emscripten

unused imports: `Arc`, `RwLock`, `collections::HashMap`

Check failure on line 45 in misc/metrics/src/lib.rs

View workflow job for this annotation

GitHub Actions / Compile on wasm32-wasi

unused imports: `Arc`, `RwLock`, `collections::HashMap`

Check failure on line 45 in misc/metrics/src/lib.rs

View workflow job for this annotation

GitHub Actions / Compile with MSRV

unused imports: `Arc`, `RwLock`, `collections::HashMap`

Check failure on line 45 in misc/metrics/src/lib.rs

View workflow job for this annotation

GitHub Actions / Test libp2p

unused imports: `Arc`, `RwLock`, `collections::HashMap`

Check failure on line 45 in misc/metrics/src/lib.rs

View workflow job for this annotation

GitHub Actions / Test libp2p-metrics

unused imports: `Arc`, `RwLock`, `collections::HashMap`

Check failure on line 45 in misc/metrics/src/lib.rs

View workflow job for this annotation

GitHub Actions / Test libp2p-server

unused imports: `Arc`, `RwLock`, `collections::HashMap`

use prometheus_client::registry::Registry;

/// Set of Swarm and protocol metrics derived from emitted events.
Expand Down
Loading