Skip to content

Commit

Permalink
fix: upgrade to libp2p 0.53 (#205)
Browse files Browse the repository at this point in the history
Libp2p has three major features we want:

1. Stable support for QUIC
2. Full tracing support
3. Uses prometheus_client 0.22, this enables use to use tokio-metrics

In order to upgrade we needed to address the follow major breaking
changes:

* KeepAlive is now a boolean signal (bitswap keep alive logic is now
  much simpler)
* Supported Protocols are only known per connection, we now hard code
  the list for diagnostic reasons

There are a few other minor breaking changes but did not affect our code
much.
  • Loading branch information
nathanielc authored Dec 5, 2023
1 parent 2196320 commit 9a8c017
Show file tree
Hide file tree
Showing 20 changed files with 579 additions and 694 deletions.
538 changes: 246 additions & 292 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ keyed_priority_queue = "0.4.1"
lazy_static = "1.4"
libipld = "0.16"
libipld-cbor = "0.16"
libp2p = { version = "0.52.4", default-features = false }
libp2p = { version = "0.53", default-features = false }
libp2p-identity = { version = "0.2", features = ["peerid", "ed25519"] }
lru = "0.10"
mime = "0.3"
Expand All @@ -116,7 +116,7 @@ opentelemetry-otlp = "0.11"
par-stream = { version = "0.10.2", default-features = false }
paste = "1.0.9"
phf = "0.11"
prometheus-client = "0.21"
prometheus-client = "0.22"
proptest = "1"
prost = "0.11"
prost-build = "0.11.1"
Expand Down Expand Up @@ -165,9 +165,7 @@ tokio = { version = "1", default-features = false, features = [
tokio-context = "0.1.3"
tokio-stream = "0.1.11"
tokio-test = "0.4.2"
tokio-util = { version = "0.7.10", default-features = false, features = [
"compat",
] }
tokio-util = { version = "0.7.10", features = ["compat", "rt"] }
toml = "0.5.9"
tower = "0.4"
tower-http = "0.3"
Expand Down
1 change: 1 addition & 0 deletions beetle/iroh-bitswap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ libp2p = { workspace = true, features = ["yamux", "noise", "tcp", "tokio"] }
tokio = { workspace = true, features = ["macros", "net", "rt"] }
tokio-util = { workspace = true, features = ["compat"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
test-log.workspace = true


[[bench]]
Expand Down
67 changes: 22 additions & 45 deletions beetle/iroh-bitswap/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
collections::VecDeque,
fmt::Debug,
task::{Context, Poll},
time::{Duration, Instant},
time::Duration,
};

use asynchronous_codec::Framed;
Expand All @@ -17,12 +17,12 @@ use futures::{
use libp2p::swarm::handler::FullyNegotiatedInbound;
use libp2p::swarm::{
handler::{DialUpgradeError, FullyNegotiatedOutbound},
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError, SubstreamProtocol,
};
use libp2p::PeerId;
use smallvec::SmallVec;
use tokio::sync::oneshot;
use tracing::{trace, warn};
use tracing::{debug, trace, warn};

use crate::{
error::Error,
Expand All @@ -31,10 +31,6 @@ use crate::{
protocol::{BitswapCodec, ProtocolConfig, ProtocolId},
};

/// The initial time (in seconds) we set the keep alive for protocol negotiations to occur.
// TODO: configurable
const INITIAL_KEEP_ALIVE: u64 = 30;

#[derive(thiserror::Error, Debug)]
pub enum BitswapHandlerError {
/// The message exceeds the maximum transmission size.
Expand Down Expand Up @@ -85,12 +81,8 @@ pub enum BitswapHandlerIn {
Unprotect,
}

type BitswapConnectionHandlerEvent = ConnectionHandlerEvent<
ProtocolConfig,
(BitswapMessage, BitswapMessageResponse),
HandlerEvent,
BitswapHandlerError,
>;
type BitswapConnectionHandlerEvent =
ConnectionHandlerEvent<ProtocolConfig, (BitswapMessage, BitswapMessageResponse), HandlerEvent>;

/// Protocol Handler that manages a single long-lived substream with a peer.
pub struct BitswapHandler {
Expand Down Expand Up @@ -118,7 +110,7 @@ pub struct BitswapHandler {
upgrade_errors: VecDeque<StreamUpgradeError<BitswapHandlerError>>,

/// Flag determining whether to maintain the connection to the peer.
keep_alive: KeepAlive,
keep_alive: bool,
}

impl Debug for BitswapHandler {
Expand All @@ -145,8 +137,6 @@ impl Debug for BitswapHandler {

impl BitswapHandler {
/// Builds a new [`BitswapHandler`].
// TODO(WS1-1291): Remove uses of KeepAlive::Until
#[allow(deprecated)]
pub fn new(
remote_peer_id: PeerId,
protocol_config: ProtocolConfig,
Expand All @@ -161,7 +151,7 @@ impl BitswapHandler {
protocol: None,
idle_timeout,
upgrade_errors: VecDeque::new(),
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)),
keep_alive: true,
events: Default::default(),
}
}
Expand All @@ -170,7 +160,6 @@ impl BitswapHandler {
impl ConnectionHandler for BitswapHandler {
type FromBehaviour = BitswapHandlerIn;
type ToBehaviour = HandlerEvent;
type Error = BitswapHandlerError;
type InboundOpenInfo = ();
type InboundProtocol = ProtocolConfig;
type OutboundOpenInfo = (BitswapMessage, BitswapMessageResponse);
Expand All @@ -180,12 +169,10 @@ impl ConnectionHandler for BitswapHandler {
self.listen_protocol.clone()
}

fn connection_keep_alive(&self) -> KeepAlive {
fn connection_keep_alive(&self) -> bool {
self.keep_alive
}

// TODO(WS1-1291): Remove uses of KeepAlive::Until
#[allow(deprecated)]
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BitswapConnectionHandlerEvent> {
inc!(BitswapMetrics::HandlerPollCount);
if !self.events.is_empty() {
Expand All @@ -197,17 +184,20 @@ impl ConnectionHandler for BitswapHandler {
// Handle any upgrade errors
if let Some(error) = self.upgrade_errors.pop_front() {
inc!(BitswapMetrics::HandlerConnUpgradeErrors);
let reported_error = match error {
let error = match error {
StreamUpgradeError::Timeout => BitswapHandlerError::NegotiationTimeout,
StreamUpgradeError::Apply(e) => e,
StreamUpgradeError::NegotiationFailed => {
BitswapHandlerError::NegotiationProtocolError
}
StreamUpgradeError::Io(e) => e.into(),
};
debug!(%error, "connection upgrade failed");

// Close the connection
return Poll::Ready(ConnectionHandlerEvent::Close(reported_error));
// We no longer want to use this connection.
// Let the swarm close it if no one else is using it.
self.keep_alive = false;
return Poll::Pending;
}

// determine if we need to create the stream
Expand All @@ -227,7 +217,7 @@ impl ConnectionHandler for BitswapHandler {
if let Poll::Ready(Some(event)) = self.inbound_substreams.poll_next_unpin(cx) {
if let ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::Message { .. }) = event {
// Update keep alive as we have received a message
self.keep_alive = KeepAlive::Until(Instant::now() + self.idle_timeout);
self.keep_alive = true;
}

return Poll::Ready(event);
Expand Down Expand Up @@ -282,35 +272,24 @@ impl ConnectionHandler for BitswapHandler {
self.upgrade_errors.push_back(err);
}

libp2p::swarm::handler::ConnectionEvent::ListenUpgradeError(_)
| libp2p::swarm::handler::ConnectionEvent::LocalProtocolsChange(_)
| libp2p::swarm::handler::ConnectionEvent::RemoteProtocolsChange(_) => {}
_ => {}
}
}

// TODO(WS1-1291): Remove uses of KeepAlive::Until
#[allow(deprecated)]
fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
match event {
BitswapHandlerIn::Message(m, response) => {
self.send_queue.push_back((m, response));

// sending a message, reset keepalive
self.keep_alive = KeepAlive::Until(Instant::now() + self.idle_timeout);
}
BitswapHandlerIn::Protect => {
self.keep_alive = KeepAlive::Yes;
}
BitswapHandlerIn::Unprotect => {
self.keep_alive =
KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE));
// sending a message, ensure keep_alive is true
self.keep_alive = true
}
BitswapHandlerIn::Protect => self.keep_alive = true,
BitswapHandlerIn::Unprotect => self.keep_alive = false,
}
}
}

// TODO(WS1-1344): Remove uses of ConnectionHandlerEvent::Close
#[allow(deprecated)]
#[tracing::instrument(skip(substream))]
fn inbound_substream(
// Include remote_peer_id for tracing context only
Expand All @@ -330,10 +309,8 @@ fn inbound_substream(
}
_ => {
warn!(%err, "inbound stream error");
// More serious errors, close this side of the stream. If the
// peer is still around, they will re-establish their connection

yield ConnectionHandlerEvent::Close(err);
// Stop using the connection, if we are the last protocol using the
// connection then it will be closed.
break;
}
}
Expand Down
37 changes: 16 additions & 21 deletions beetle/iroh-bitswap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use ceramic_metrics::{bitswap::BitswapMetrics, inc, record};
use cid::Cid;
use handler::{BitswapHandler, HandlerEvent};
use libp2p::swarm::ConnectionId;
use libp2p::swarm::{
CloseConnection, DialError, NetworkBehaviour, NotifyHandler, PollParameters, ToSwarm,
};
use libp2p::swarm::{CloseConnection, DialError, NetworkBehaviour, NotifyHandler, ToSwarm};
use libp2p::{swarm::dial_opts::DialOpts, StreamProtocol};
use libp2p::{Multiaddr, PeerId};
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -404,7 +402,7 @@ impl<S: Store> NetworkBehaviour for Bitswap<S> {
type ConnectionHandler = BitswapHandler;
type ToSwarm = BitswapEvent;

fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm<Self::ConnectionHandler>) {
fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) {
match event {
libp2p::swarm::FromSwarm::ConnectionEstablished(event) => {
trace!(
Expand Down Expand Up @@ -472,7 +470,6 @@ impl<S: Store> NetworkBehaviour for Bitswap<S> {
fn poll(
&mut self,
cx: &mut Context<'_>,
_params: &mut impl PollParameters,
) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
inc!(BitswapMetrics::ToSwarmPollTick);
// limit work
Expand Down Expand Up @@ -620,9 +617,9 @@ mod tests {
use libp2p::yamux;
use libp2p::{core::muxing::StreamMuxerBox, swarm};
use libp2p::{noise, PeerId, Swarm, Transport};
use test_log::test;
use tokio::sync::{mpsc, RwLock};
use tracing::{info, trace};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

use super::*;
use crate::Block;
Expand Down Expand Up @@ -699,57 +696,54 @@ mod tests {
}
}

#[tokio::test]
#[test(tokio::test)]
async fn test_get_1_block() {
get_block::<1>().await;
}

#[tokio::test]
#[test(tokio::test)]
async fn test_get_2_block() {
get_block::<2>().await;
}

#[tokio::test]
#[test(tokio::test)]
async fn test_get_4_block() {
get_block::<4>().await;
}

#[tokio::test]
#[test(tokio::test)]
async fn test_get_64_block() {
get_block::<64>().await;
}

#[tokio::test]
#[test(tokio::test)]
async fn test_get_65_block() {
get_block::<65>().await;
}

#[tokio::test]
#[test(tokio::test)]
async fn test_get_66_block() {
get_block::<66>().await;
}

#[tokio::test]
#[test(tokio::test)]
async fn test_get_128_block() {
tracing_subscriber::registry()
.with(fmt::layer().pretty())
.with(EnvFilter::from_default_env())
.init();

get_block::<128>().await;
}

#[tokio::test]
#[test(tokio::test)]
async fn test_get_1024_block() {
get_block::<1024>().await;
}

async fn get_block<const N: usize>() {
info!("get_block");
let (peer1_id, trans) = mk_transport();
let store1 = TestStore::default();
let bs1 = Bitswap::new(peer1_id, store1.clone(), Config::default()).await;

let config = swarm::Config::with_tokio_executor();
let config = swarm::Config::with_tokio_executor()
.with_idle_connection_timeout(Duration::from_secs(5));
let mut swarm1 = Swarm::new(trans, bs1, peer1_id, config);
let blocks = (0..N).map(|_| create_random_block_v1()).collect::<Vec<_>>();

Expand Down Expand Up @@ -783,7 +777,8 @@ mod tests {
let store2 = TestStore::default();
let bs2 = Bitswap::new(peer2_id, store2.clone(), Config::default()).await;

let config = swarm::Config::with_tokio_executor();
let config = swarm::Config::with_tokio_executor()
.with_idle_connection_timeout(Duration::from_secs(5));
let mut swarm2 = Swarm::new(trans, bs2, peer2_id, config);

let swarm2_bs = swarm2.behaviour().clone();
Expand Down
1 change: 1 addition & 0 deletions one/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ tracing-appender = "0.2.2"
tracing-subscriber.workspace = true
tracing.workspace = true


[features]
default = []
tokio-console = ["ceramic-metrics/tokio-console"]
Expand Down
1 change: 0 additions & 1 deletion one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ impl Daemon {
let info = Info::new().await?;

let mut metrics_config = MetricsConfig {
// Do not push metrics to any endpoint.
export: opts.metrics,
tracing: opts.tracing,
log_format: match opts.log_format {
Expand Down
5 changes: 3 additions & 2 deletions one/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use libp2p::identity::Keypair;
use recon::{libp2p::Recon, Sha256a};
use sqlx::SqlitePool;
use tokio::task::{self, JoinHandle};
use tracing::error;
use tracing::{debug, error};

/// Builder provides an ordered API for constructing an Ipfs service.
pub struct Builder<S: BuilderState> {
Expand Down Expand Up @@ -56,8 +56,9 @@ impl Builder<Init> {

let task = task::spawn(async move {
if let Err(err) = p2p.run().await {
error!("{:?}", err);
error!(%err, "failed to gracefully stop p2p task");
}
debug!("node task finished");
});

Ok(Builder {
Expand Down
Loading

0 comments on commit 9a8c017

Please sign in to comment.