diff --git a/CHANGELOG.md b/CHANGELOG.md index 45f839e15..8a0c87ef5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ # 0.3.17 [unreleased] +- chore: Implement functions to add session to bitswap [PR 83] - refactor: Use channels instead of Subscription [PR 82] - feat: Ability to add custom behaviour [PR 81] - feat: Implement Ipfs::connection_events [PR 80] @@ -26,6 +27,7 @@ [PR 80]: https://github.com/dariusc93/rust-ipfs/pull/80 [PR 81]: https://github.com/dariusc93/rust-ipfs/pull/81 [PR 82]: https://github.com/dariusc93/rust-ipfs/pull/82 +[PR 83]: https://github.com/dariusc93/rust-ipfs/pull/83 # 0.3.16 - fix: Return events from gossipsub stream [PR 68] diff --git a/src/dag.rs b/src/dag.rs index 24b33b778..9b692149a 100644 --- a/src/dag.rs +++ b/src/dag.rs @@ -225,6 +225,17 @@ impl IpldDag { path: IpfsPath, providers: &[PeerId], local_only: bool, + ) -> Result { + self.get_with_session(None, path, providers, local_only) + .await + } + + pub(crate) async fn get_with_session( + &self, + session: Option, + path: IpfsPath, + providers: &[PeerId], + local_only: bool, ) -> Result { let resolved_path = self .ipfs @@ -240,7 +251,7 @@ impl IpldDag { let mut iter = resolved_path.iter().peekable(); let (node, _) = match self - .resolve0(cid, &mut iter, true, providers, local_only) + .resolve0(session, cid, &mut iter, true, providers, local_only) .await { Ok(t) => t, @@ -270,6 +281,18 @@ impl IpldDag { follow_links: bool, providers: &[PeerId], local_only: bool, + ) -> Result<(ResolvedNode, SlashedPath), ResolveError> { + self.resolve_with_session(None, path, follow_links, providers, local_only) + .await + } + + pub(crate) async fn resolve_with_session( + &self, + session: Option, + path: IpfsPath, + follow_links: bool, + providers: &[PeerId], + local_only: bool, ) -> Result<(ResolvedNode, SlashedPath), ResolveError> { let resolved_path = self .ipfs @@ -285,7 +308,7 @@ impl IpldDag { let (node, matched_segments) = { let mut iter = resolved_path.iter().peekable(); match self - .resolve0(cid, &mut iter, follow_links, providers, local_only) + .resolve0(session, cid, &mut iter, follow_links, providers, local_only) .await { Ok(t) => t, @@ -307,6 +330,7 @@ impl IpldDag { /// Return the node where the resolving ended, and the **count** of segments matched. async fn resolve0<'a>( &self, + session: Option, cid: &Cid, segments: &mut Peekable>, follow_links: bool, @@ -315,7 +339,7 @@ impl IpldDag { ) -> Result<(ResolvedNode, usize), RawResolveLocalError> { use LocallyResolved::*; - let mut current = cid.to_owned(); + let mut current = *cid; let mut total = 0; let mut cache = None; @@ -324,7 +348,7 @@ impl IpldDag { let block = match self .ipfs .repo - .get_block(¤t, providers, local_only) + .get_block_with_session(session, ¤t, providers, local_only) .await { Ok(block) => block, diff --git a/src/lib.rs b/src/lib.rs index 4a4433e0b..7db9a3ec2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -69,6 +69,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, time::Duration, + sync::atomic::AtomicU64, }; use self::{ @@ -111,6 +112,8 @@ use libp2p::{ swarm::dial_opts::DialOpts, }; +pub(crate) static BITSWAP_ID: AtomicU64 = AtomicU64::new(1); + #[derive(Debug, Clone)] pub enum StoragePath { Disk(PathBuf), diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index aed470b13..d87ceb9d3 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -573,6 +573,8 @@ impl> Behaviour { self.addressbook.add_address(peer, addr); } + self.pubsub.add_explicit_peer(&peer); + // if let Some(bitswap) = self.bitswap.as_ref() { // let client = bitswap.client().clone(); // let server = bitswap.server().cloned(); diff --git a/src/repo/mod.rs b/src/repo/mod.rs index c14e7953b..56ac4057b 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -474,11 +474,23 @@ impl Repo { /// Retrives a block from the block store, or starts fetching it from the network and awaits /// until it has been fetched. + #[inline] pub async fn get_block( &self, cid: &Cid, peers: &[PeerId], local_only: bool, + ) -> Result { + self.get_block_with_session(None, cid, peers, local_only) + .await + } + + pub(crate) async fn get_block_with_session( + &self, + session: Option, + cid: &Cid, + peers: &[PeerId], + local_only: bool, ) -> Result { if let Some(block) = self.get_block_now(cid).await? { Ok(block) @@ -495,7 +507,7 @@ impl Repo { // and that is okay with us. self.events .clone() - .send(RepoEvent::WantBlock(None, *cid, peers.to_vec())) + .send(RepoEvent::WantBlock(session, *cid, peers.to_vec())) .await .ok(); diff --git a/src/task.rs b/src/task.rs index 4a2b1fd0b..4d5037c93 100644 --- a/src/task.rs +++ b/src/task.rs @@ -24,7 +24,6 @@ use tokio::task::JoinHandle; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, io, - sync::atomic::AtomicU64, time::Duration, }; @@ -70,9 +69,6 @@ use libp2p::{ swarm::SwarmEvent, }; -#[allow(dead_code)] -static BITSWAP_ID: AtomicU64 = AtomicU64::new(0); - /// Background task of `Ipfs` created when calling `UninitializedIpfs::start`. // The receivers are Fuse'd so that we don't have to manage state on them being exhausted. #[allow(clippy::type_complexity)] diff --git a/src/unixfs/cat.rs b/src/unixfs/cat.rs index 33a638b5d..9b906d9ed 100644 --- a/src/unixfs/cat.rs +++ b/src/unixfs/cat.rs @@ -23,6 +23,8 @@ pub async fn cat<'a>( providers: &'a [PeerId], local_only: bool, ) -> Result, TraversalFailed>> + Send + 'a, TraversalFailed> { + let session = Some(crate::BITSWAP_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)); + let ipfs = ipfs.clone(); let mut visit = IdleFileVisit::default(); if let Some(range) = range { @@ -36,7 +38,7 @@ pub async fn cat<'a>( let borrow = ipfs.clone(); let dag = borrow.dag(); let (resolved, _) = dag - .resolve(path, true, providers, local_only) + .resolve_with_session(session, path, true, providers, local_only) .await .map_err(TraversalFailed::Resolving)?; resolved @@ -89,7 +91,7 @@ pub async fn cat<'a>( let (next, _) = visit.pending_links(); let borrow = ipfs.borrow(); - let block = match borrow.repo().get_block(next, providers, local_only).await { + let block = match borrow.repo().get_block_with_session(session, next, providers, local_only).await { Ok(block) => block, Err(e) => { yield Err(TraversalFailed::Loading(next.to_owned(), e)); diff --git a/src/unixfs/get.rs b/src/unixfs/get.rs index 48cc70e05..3397265f9 100644 --- a/src/unixfs/get.rs +++ b/src/unixfs/get.rs @@ -19,9 +19,11 @@ pub async fn get<'a, P: AsRef>( let mut file = tokio::fs::File::create(dest).await?; let ipfs = ipfs.clone(); + let session = Some(crate::BITSWAP_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)); + let (resolved, _) = ipfs .dag() - .resolve(path.clone(), true, providers, local_only) + .resolve_with_session(session, path.clone(), true, providers, local_only) .await?; let block = resolved.into_unixfs_block()?; @@ -37,7 +39,7 @@ pub async fn get<'a, P: AsRef>( let mut written = 0; while walker.should_continue() { let (next, _) = walker.pending_links(); - let block = match ipfs.repo().get_block(next, providers, local_only).await { + let block = match ipfs.repo().get_block_with_session(session, next, providers, local_only).await { Ok(block) => block, Err(e) => { yield UnixfsStatus::FailedStatus { written, total_size, error: Some(anyhow::anyhow!("{e}")) }; diff --git a/src/unixfs/ls.rs b/src/unixfs/ls.rs index b2f0b3c92..65c773169 100644 --- a/src/unixfs/ls.rs +++ b/src/unixfs/ls.rs @@ -21,9 +21,11 @@ pub async fn ls<'a>( ) -> anyhow::Result> { let ipfs = ipfs.clone(); + let session = Some(crate::BITSWAP_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)); + let (resolved, _) = ipfs .dag() - .resolve(path.clone(), true, providers, local_only) + .resolve_with_session(session, path.clone(), true, providers, local_only) .await?; let block = resolved.into_unixfs_block()?; @@ -38,7 +40,7 @@ pub async fn ls<'a>( let mut root_directory = String::new(); while walker.should_continue() { let (next, _) = walker.pending_links(); - let block = match ipfs.repo().get_block(next, providers, local_only).await { + let block = match ipfs.repo().get_block_with_session(session, next, providers, local_only).await { Ok(block) => block, Err(error) => { yield NodeItem::Error { error };