From b1d78f5c6cd2dc633b43ec841a33add8603f8a43 Mon Sep 17 00:00:00 2001 From: pasha Date: Mon, 9 Jan 2023 13:02:43 +0100 Subject: [PATCH 01/17] fix: Move block verifying down the street --- iroh-bitswap/src/lib.rs | 11 ++++++----- iroh-gateway/src/client.rs | 2 +- iroh-gateway/src/handlers.rs | 4 ++-- iroh-p2p/src/node.rs | 2 +- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/iroh-bitswap/src/lib.rs b/iroh-bitswap/src/lib.rs index ae1b3f1ba..ed814efda 100644 --- a/iroh-bitswap/src/lib.rs +++ b/iroh-bitswap/src/lib.rs @@ -146,7 +146,7 @@ impl Bitswap { }; let client = Client::new(network.clone(), store, cb, config.client).await; - let (sender_msg, mut receiver_msg) = mpsc::channel(2048); + let (sender_msg, mut receiver_msg) = mpsc::channel::<(PeerId, BitswapMessage)>(2048); let (sender_con, mut receiver_con) = mpsc::channel(2048); let (sender_dis, mut receiver_dis) = mpsc::channel(2048); @@ -157,7 +157,11 @@ impl Bitswap { async move { // process messages serially but without blocking the p2p loop - while let Some((peer, message)) = receiver_msg.recv().await { + while let Some((peer, mut message)) = receiver_msg.recv().await { + let message = tokio::task::spawn_blocking(move || { + message.verify_blocks(); + message + }).await.expect("cannot spawn blocking thread"); if let Some(ref server) = server { futures::future::join( client.receive_message(&peer, &message), @@ -494,10 +498,7 @@ impl NetworkBehaviour for Bitswap { mut message, protocol, } => { - // mark peer as responsive self.set_peer_state(&peer_id, PeerState::Responsive(connection, protocol)); - - message.verify_blocks(); self.receive_message(peer_id, message); } HandlerEvent::FailedToSendMessage { .. } => { diff --git a/iroh-gateway/src/client.rs b/iroh-gateway/src/client.rs index 3873a70a5..dd9ced9dc 100644 --- a/iroh-gateway/src/client.rs +++ b/iroh-gateway/src/client.rs @@ -114,7 +114,7 @@ impl Client { self.resolver.resolve(path).await.map_err(|e| e.to_string()) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(skip(self, path_metadata))] pub async fn get_file( &self, path: iroh_resolver::resolver::Path, diff --git a/iroh-gateway/src/handlers.rs b/iroh-gateway/src/handlers.rs index 73343ee2f..97473487e 100644 --- a/iroh-gateway/src/handlers.rs +++ b/iroh-gateway/src/handlers.rs @@ -717,7 +717,7 @@ async fn serve_car_recursive( Ok(GatewayResponse::new(StatusCode::OK, body, headers)) } -#[tracing::instrument()] +#[tracing::instrument(skip_all)] #[async_recursion] async fn serve_fs( req: &IpfsRequest, @@ -842,7 +842,7 @@ async fn serve_fs( } } -#[tracing::instrument()] +#[tracing::instrument(skip_all)] async fn serve_fs_dir( dir_list: &[Link], req: &IpfsRequest, diff --git a/iroh-p2p/src/node.rs b/iroh-p2p/src/node.rs index 7b9b128d0..df5a9e614 100644 --- a/iroh-p2p/src/node.rs +++ b/iroh-p2p/src/node.rs @@ -380,7 +380,7 @@ impl Node { let worker = tokio::task::spawn(async move { tokio::select! { _ = closer_r => { - // Explicit sesssion stop. + // Explicit session stop. debug!("session {}: stopped: closed", ctx); } _ = chan.closed() => { From 97dcf74ac4ce10f3b30c54090cd46dc10379c989 Mon Sep 17 00:00:00 2001 From: pasha Date: Fri, 16 Dec 2022 14:54:10 +0100 Subject: [PATCH 02/17] feat: Add pub ctor to Directory --- iroh-bitswap/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-bitswap/src/lib.rs b/iroh-bitswap/src/lib.rs index ed814efda..354d216c5 100644 --- a/iroh-bitswap/src/lib.rs +++ b/iroh-bitswap/src/lib.rs @@ -26,7 +26,7 @@ use libp2p::swarm::{ use libp2p::{Multiaddr, PeerId}; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; -use tracing::{debug, trace, warn}; +use tracing::{debug, info, trace, warn}; use self::client::{Client, Config as ClientConfig}; use self::message::BitswapMessage; From 7a5c6f18d18f633c239b99e68a28a3bfa505864f Mon Sep 17 00:00:00 2001 From: pasha Date: Wed, 28 Dec 2022 15:17:11 +0100 Subject: [PATCH 03/17] fix: reduce logging level --- iroh-bitswap/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-bitswap/src/lib.rs b/iroh-bitswap/src/lib.rs index 354d216c5..5cf5076e3 100644 --- a/iroh-bitswap/src/lib.rs +++ b/iroh-bitswap/src/lib.rs @@ -306,7 +306,7 @@ impl Bitswap { record!(BitswapMetrics::MessageBytesIn, message.encoded_len() as u64); // TODO: Handle backpressure properly if let Err(err) = self.incoming_messages.try_send((peer, message)) { - warn!( + info!( "failed to receive message from {}: {:?}, dropping", peer, err ); From 88d2b67d6d0e849f681e64a63c6437bc5064a564 Mon Sep 17 00:00:00 2001 From: pasha Date: Fri, 23 Dec 2022 21:17:02 +0100 Subject: [PATCH 04/17] feat: Pretty printing for addrs --- iroh-bitswap/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-bitswap/src/lib.rs b/iroh-bitswap/src/lib.rs index 5cf5076e3..178dd5024 100644 --- a/iroh-bitswap/src/lib.rs +++ b/iroh-bitswap/src/lib.rs @@ -306,7 +306,7 @@ impl Bitswap { record!(BitswapMetrics::MessageBytesIn, message.encoded_len() as u64); // TODO: Handle backpressure properly if let Err(err) = self.incoming_messages.try_send((peer, message)) { - info!( + debug!( "failed to receive message from {}: {:?}, dropping", peer, err ); From e9ae241f54cdc4c7cce9328a4de8e960517af186 Mon Sep 17 00:00:00 2001 From: pasha Date: Wed, 28 Dec 2022 15:40:24 +0100 Subject: [PATCH 05/17] fix: clippy --- iroh-bitswap/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-bitswap/src/lib.rs b/iroh-bitswap/src/lib.rs index 178dd5024..674d30d86 100644 --- a/iroh-bitswap/src/lib.rs +++ b/iroh-bitswap/src/lib.rs @@ -26,7 +26,7 @@ use libp2p::swarm::{ use libp2p::{Multiaddr, PeerId}; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, trace, warn}; use self::client::{Client, Config as ClientConfig}; use self::message::BitswapMessage; From 9a682ef71b265d7a670fbb7f8c9b490008cb18d3 Mon Sep 17 00:00:00 2001 From: pasha Date: Thu, 5 Jan 2023 13:01:52 +0100 Subject: [PATCH 06/17] pr fixes --- iroh-bitswap/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-bitswap/src/lib.rs b/iroh-bitswap/src/lib.rs index 674d30d86..ed814efda 100644 --- a/iroh-bitswap/src/lib.rs +++ b/iroh-bitswap/src/lib.rs @@ -306,7 +306,7 @@ impl Bitswap { record!(BitswapMetrics::MessageBytesIn, message.encoded_len() as u64); // TODO: Handle backpressure properly if let Err(err) = self.incoming_messages.try_send((peer, message)) { - debug!( + warn!( "failed to receive message from {}: {:?}, dropping", peer, err ); From 550b488aa4f9f02d939a7caccc1669c822246000 Mon Sep 17 00:00:00 2001 From: pasha Date: Mon, 9 Jan 2023 08:59:23 +0100 Subject: [PATCH 07/17] fix: Proper path when redirect to index.html --- iroh-gateway/src/handlers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-gateway/src/handlers.rs b/iroh-gateway/src/handlers.rs index 97473487e..97aea121c 100644 --- a/iroh-gateway/src/handlers.rs +++ b/iroh-gateway/src/handlers.rs @@ -861,7 +861,7 @@ async fn serve_fs_dir( if !force_dir && has_index { if !req.resolved_path.has_trailing_slash() { let redirect_path = format!( - "{}/{}", + "{}{}", req.request_path_for_redirection(), req.query_params.to_query_string() ); From d4fbcf5cea864de15dca5619a7b8b5948a290801 Mon Sep 17 00:00:00 2001 From: pasha Date: Mon, 9 Jan 2023 18:45:58 +0100 Subject: [PATCH 08/17] fix: Set base url for styles --- iroh-gateway/assets/404.html | 4 ++-- iroh-gateway/assets/dir_list.html | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/iroh-gateway/assets/404.html b/iroh-gateway/assets/404.html index e2d50089e..d49656a96 100644 --- a/iroh-gateway/assets/404.html +++ b/iroh-gateway/assets/404.html @@ -21,8 +21,8 @@ - - + + {{ root_path }} diff --git a/iroh-gateway/assets/dir_list.html b/iroh-gateway/assets/dir_list.html index 1a3b2ace0..3bbf98332 100644 --- a/iroh-gateway/assets/dir_list.html +++ b/iroh-gateway/assets/dir_list.html @@ -18,8 +18,8 @@ - - + + {{ root_path }} From e2c03241d0d8f46787539b6fa42cc364348415bf Mon Sep 17 00:00:00 2001 From: pasha Date: Wed, 11 Jan 2023 14:43:34 +0100 Subject: [PATCH 09/17] fix: Process index.html correctly --- iroh-gateway/src/client.rs | 86 +++++++++++++++++++------------- iroh-gateway/src/handlers.rs | 75 +++++++++++----------------- iroh-gateway/src/ipfs_request.rs | 31 ++++++++++++ iroh-gateway/src/lib.rs | 1 + iroh-resolver/src/resolver.rs | 6 +++ 5 files changed, 117 insertions(+), 82 deletions(-) create mode 100644 iroh-gateway/src/ipfs_request.rs diff --git a/iroh-gateway/src/client.rs b/iroh-gateway/src/client.rs index dd9ced9dc..c1449438e 100644 --- a/iroh-gateway/src/client.rs +++ b/iroh-gateway/src/client.rs @@ -6,7 +6,7 @@ use anyhow::Result; use bytes::Bytes; use cid::Cid; use futures::{StreamExt, TryStream}; -use http::HeaderMap; +use http::{HeaderMap, StatusCode}; use iroh_car::{CarHeader, CarWriter}; use iroh_metrics::{ core::{MObserver, MRecorder}, @@ -15,15 +15,18 @@ use iroh_metrics::{ resolver::OutMetrics, }; use iroh_resolver::dns_resolver::Config; -use iroh_resolver::resolver::{CidOrDomain, Metadata, Out, OutPrettyReader, OutType, Resolver}; +use iroh_resolver::resolver::{Metadata, Out, OutPrettyReader, OutType, Resolver}; use iroh_unixfs::{content_loader::ContentLoader, Source}; use mime::Mime; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWrite}; use tokio_util::io::ReaderStream; use tracing::{info, warn}; +use crate::constants::RECURSION_LIMIT; +use crate::error::GatewayError; +use crate::handler_params::GetParams; +use crate::ipfs_request::IpfsRequest; use crate::response::ResponseFormat; -use crate::{constants::RECURSION_LIMIT, handler_params::GetParams}; #[derive(Debug, Clone)] pub struct Client { @@ -95,23 +98,56 @@ impl Client { } } + #[tracing::instrument(skip(self))] + pub async fn build_ipfs_request( + &self, + path: &iroh_resolver::resolver::Path, + query_params: &GetParams, + format: ResponseFormat, + subdomain_mode: bool, + ) -> Result { + info!("build ipfs request {}", path); + let path_metadata = match self + .retrieve_path_metadata(path.clone(), format == ResponseFormat::Raw) + .await + { + Ok(metadata) => metadata, + Err(e) => { + if e == "offline" { + return Err(GatewayError::new(StatusCode::SERVICE_UNAVAILABLE, &e)); + } else if e.starts_with("failed to find") { + return Err(GatewayError::new(StatusCode::NOT_FOUND, &e)); + } else { + return Err(GatewayError::new(StatusCode::INTERNAL_SERVER_ERROR, &e)); + } + } + }; + Ok(IpfsRequest { + format, + cid: path.root().clone(), + resolved_path: path.clone(), + query_params: query_params.clone(), + subdomain_mode, + path_metadata, + }) + } + #[tracing::instrument(skip(self))] pub async fn retrieve_path_metadata( &self, path: iroh_resolver::resolver::Path, - format: Option, + raw_format: bool, ) -> Result { info!("retrieve path metadata {}", path); - if let Some(f) = format { - if f == ResponseFormat::Raw { - return self - .resolver - .resolve_raw(path) - .await - .map_err(|e| e.to_string()); - } + if raw_format { + return self + .resolver + .resolve_raw(path) + .await + .map_err(|e| e.to_string()); + } else { + self.resolver.resolve(path).await.map_err(|e| e.to_string()) } - self.resolver.resolve(path).await.map_err(|e| e.to_string()) } #[tracing::instrument(skip(self, path_metadata))] @@ -126,7 +162,7 @@ impl Client { let path_metadata = if let Some(path_metadata) = path_metadata { path_metadata } else { - self.retrieve_path_metadata(path.clone(), None).await? + self.retrieve_path_metadata(path.clone(), false).await? }; let metadata = path_metadata.metadata().clone(); record_ttfb_metrics(start_time, &metadata.source); @@ -241,28 +277,6 @@ impl Client { } } -#[derive(Debug, Clone)] -pub struct IpfsRequest { - pub format: ResponseFormat, - pub cid: CidOrDomain, - pub resolved_path: iroh_resolver::resolver::Path, - pub query_file_name: String, - pub download: bool, - pub query_params: GetParams, - pub subdomain_mode: bool, - pub path_metadata: Out, -} - -impl IpfsRequest { - pub fn request_path_for_redirection(&self) -> String { - if self.subdomain_mode { - self.resolved_path.to_relative_string() - } else { - self.resolved_path.to_string() - } - } -} - async fn fetch_car_recursive( resolver: &Resolver, path: iroh_resolver::resolver::Path, diff --git a/iroh-gateway/src/handlers.rs b/iroh-gateway/src/handlers.rs index 97aea121c..c57a601fe 100644 --- a/iroh-gateway/src/handlers.rs +++ b/iroh-gateway/src/handlers.rs @@ -42,9 +42,10 @@ use crate::handler_params::{ inlined_dns_link_to_dns_link, recode_path_to_inlined_dns_link, DefaultHandlerPathParams, GetParams, SubdomainHandlerPathParams, }; +use crate::ipfs_request::IpfsRequest; use crate::text::IpfsSubdomain; use crate::{ - client::{FileResult, IpfsRequest}, + client::FileResult, constants::*, core::State, error::GatewayError, @@ -183,25 +184,12 @@ async fn request_preprocessing( // parse query params let format = get_response_format(request_headers, &query_params.format) .map_err(|err| GatewayError::new(StatusCode::BAD_REQUEST, &err))?; - - let path_metadata = match state + let ipfs_request = state .client - .retrieve_path_metadata(path.clone(), Some(format.clone())) - .await - { - Ok(metadata) => metadata, - Err(e) => { - if e == "offline" { - return Err(GatewayError::new(StatusCode::SERVICE_UNAVAILABLE, &e)); - } else if e.starts_with("failed to find") { - return Err(GatewayError::new(StatusCode::NOT_FOUND, &e)); - } else { - return Err(GatewayError::new(StatusCode::INTERNAL_SERVER_ERROR, &e)); - } - } - }; + .build_ipfs_request(path, query_params, format.clone(), subdomain_mode) + .await?; - let resolved_cid = path_metadata.metadata().resolved_path.last(); + let resolved_cid = ipfs_request.path_metadata.metadata().resolved_path.last(); let resolved_cid = match resolved_cid { Some(cid) => cid, None => { @@ -243,22 +231,9 @@ async fn request_preprocessing( }; response_headers.insert(&HEADER_X_IPFS_PATH, hv); - // handle request and fetch data - let req = IpfsRequest { - format, - cid: path.root().clone(), - resolved_path: path.clone(), - query_file_name: query_params - .filename - .as_deref() - .unwrap_or_default() - .to_string(), - download: query_params.download.unwrap_or_default(), - query_params: query_params.clone(), - subdomain_mode, - path_metadata, - }; - Ok(RequestPreprocessingResult::ShouldRequestData(Box::new(req))) + Ok(RequestPreprocessingResult::ShouldRequestData(Box::new( + ipfs_request, + ))) } pub async fn handler( @@ -604,9 +579,9 @@ async fn serve_raw( match body { FileResult::File(body) | FileResult::Raw(body) => { - let file_name = match req.query_file_name.is_empty() { + let file_name = match req.query_file_name().is_empty() { true => format!("{}.bin", req.cid), - false => req.query_file_name.clone(), + false => req.query_file_name().to_string(), }; set_content_disposition_headers(&mut headers, &file_name, DISPOSITION_ATTACHMENT); @@ -662,9 +637,9 @@ async fn serve_car( match body { FileResult::File(body) | FileResult::Raw(body) => { - let file_name = match req.query_file_name.is_empty() { + let file_name = match req.query_file_name().is_empty() { true => format!("{}.car", req.cid), - false => req.query_file_name.clone(), + false => req.query_file_name().to_string(), }; set_content_disposition_headers(&mut headers, &file_name, DISPOSITION_ATTACHMENT); @@ -700,9 +675,9 @@ async fn serve_car_recursive( .await .map_err(|e| GatewayError::new(StatusCode::INTERNAL_SERVER_ERROR, &e))?; - let file_name = match req.query_file_name.is_empty() { + let file_name = match req.query_file_name().is_empty() { true => format!("{}.car", req.cid), - false => req.query_file_name.clone(), + false => req.query_file_name().to_string(), }; set_content_disposition_headers(&mut headers, &file_name, DISPOSITION_ATTACHMENT); @@ -783,9 +758,9 @@ async fn serve_fs( } let name = add_content_disposition_headers( &mut headers, - &req.query_file_name, + req.query_file_name(), &req.resolved_path, - req.download, + req.query_download(), ); if metadata.unixfs_type == Some(UnixfsType::Symlink) { headers.insert( @@ -831,9 +806,9 @@ async fn serve_fs( } let name = add_content_disposition_headers( &mut headers, - &req.query_file_name, + req.query_file_name(), &req.resolved_path, - req.download, + req.query_params.download.unwrap_or_default(), ); let content_sniffed_mime = body.get_mime(); add_content_type_headers(&mut headers, &name, content_sniffed_mime); @@ -867,8 +842,16 @@ async fn serve_fs_dir( ); return Ok(GatewayResponse::redirect_permanently(&redirect_path)); } - let mut new_req = req.clone(); - new_req.resolved_path.push("index.html"); + let modified_path = req.resolved_path.with_suffix("index.html"); + let new_req = state + .client + .build_ipfs_request( + &modified_path, + &req.query_params, + req.format.clone(), + req.subdomain_mode, + ) + .await?; return serve_fs(&new_req, state, headers, http_req, start_time).await; } diff --git a/iroh-gateway/src/ipfs_request.rs b/iroh-gateway/src/ipfs_request.rs new file mode 100644 index 000000000..f158fe3a4 --- /dev/null +++ b/iroh-gateway/src/ipfs_request.rs @@ -0,0 +1,31 @@ +use crate::handler_params::GetParams; +use crate::response::ResponseFormat; +use iroh_resolver::resolver::{CidOrDomain, Out}; + +#[derive(Debug)] +pub struct IpfsRequest { + pub format: ResponseFormat, + pub cid: CidOrDomain, + pub resolved_path: iroh_resolver::resolver::Path, + pub query_params: GetParams, + pub subdomain_mode: bool, + pub path_metadata: Out, +} + +impl IpfsRequest { + pub fn request_path_for_redirection(&self) -> String { + if self.subdomain_mode { + self.resolved_path.to_relative_string() + } else { + self.resolved_path.to_string() + } + } + + pub fn query_file_name(&self) -> &str { + self.query_params.filename.as_deref().unwrap_or_default() + } + + pub fn query_download(&self) -> bool { + self.query_params.download.unwrap_or_default() + } +} diff --git a/iroh-gateway/src/lib.rs b/iroh-gateway/src/lib.rs index 76d53efed..03f7816bd 100644 --- a/iroh-gateway/src/lib.rs +++ b/iroh-gateway/src/lib.rs @@ -9,6 +9,7 @@ mod error; pub mod handler_params; pub mod handlers; pub mod headers; +mod ipfs_request; pub mod metrics; pub mod response; mod rpc; diff --git a/iroh-resolver/src/resolver.rs b/iroh-resolver/src/resolver.rs index 98aa59719..e7b4b3e0d 100644 --- a/iroh-resolver/src/resolver.rs +++ b/iroh-resolver/src/resolver.rs @@ -125,6 +125,12 @@ impl Path { self.tail.push(str.as_ref().to_owned()); } + pub fn with_suffix(&self, suffix: impl AsRef) -> Self { + let mut suffixed = self.clone(); + suffixed.push(suffix); + suffixed + } + // Empty path segments in the *middle* shouldn't occur, // though they can occur at the end, which `join` handles. // TODO(faassen): it would make sense to return a `RelativePathBuf` here at some From 4bb196449246b08b83132cf87bcb7ab69d92ce6e Mon Sep 17 00:00:00 2001 From: pasha Date: Mon, 9 Jan 2023 10:40:06 +0100 Subject: [PATCH 10/17] feat: Add extra files in DirectoryBuilder --- iroh-unixfs/src/builder.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/iroh-unixfs/src/builder.rs b/iroh-unixfs/src/builder.rs index 89b574d78..cdc866076 100644 --- a/iroh-unixfs/src/builder.rs +++ b/iroh-unixfs/src/builder.rs @@ -559,6 +559,12 @@ impl DirectoryBuilder { Ok(if let Some(path) = path { let mut dir = make_dir_from_path(path, chunker.clone(), degree).await?; + match &mut dir { + Directory::Basic(basic) => { + basic.entries.extend(entries) + } + Directory::Hamt(_) => unimplemented!() + } if let Some(name) = name { dir.set_name(name); } From 428482960a2f0ae58e70c5931b4d681438ef9171 Mon Sep 17 00:00:00 2001 From: pasha Date: Mon, 9 Jan 2023 11:21:03 +0100 Subject: [PATCH 11/17] feat: Support adding raw blocks in Builder --- iroh-api/src/api.rs | 3 +++ iroh-unixfs/src/builder.rs | 45 ++++++++++++++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/iroh-api/src/api.rs b/iroh-api/src/api.rs index 947dd786c..3d7402ab6 100644 --- a/iroh-api/src/api.rs +++ b/iroh-api/src/api.rs @@ -186,6 +186,9 @@ impl Api { UnixfsEntry::Symlink(s) => Box::pin(async_stream::try_stream! { yield s.encode()? }), + UnixfsEntry::RawBlock(r) => Box::pin(async_stream::try_stream! { + yield r.encode()? + }), }; Ok(Box::pin( diff --git a/iroh-unixfs/src/builder.rs b/iroh-unixfs/src/builder.rs index cdc866076..c703991c5 100644 --- a/iroh-unixfs/src/builder.rs +++ b/iroh-unixfs/src/builder.rs @@ -277,6 +277,37 @@ impl Symlink { } } +/// Representation of a raw block +#[derive(Debug, PartialEq, Eq)] +pub struct RawBlock { + name: String, + block: Block, +} + +impl RawBlock { + pub fn new(name: &str, block: Block) -> Self { + RawBlock { + name: name.to_string(), + block, + } + } + pub fn name(&self) -> &str { + &self.name + } + + pub fn into_block(self) -> Block { + self.block + } + + pub fn wrap(self) -> Directory { + Directory::single("".into(), Entry::RawBlock(self)) + } + + pub fn encode(self) -> Result { + Ok(self.into_block()) + } +} + /// Constructs a UnixFS file. pub struct FileBuilder { name: Option, @@ -408,6 +439,7 @@ pub enum Entry { File(File), Directory(Directory), Symlink(Symlink), + RawBlock(RawBlock), } impl Entry { @@ -416,6 +448,7 @@ impl Entry { Entry::File(f) => f.name(), Entry::Directory(d) => d.name(), Entry::Symlink(s) => s.name(), + Entry::RawBlock(r) => r.name(), } } @@ -424,6 +457,7 @@ impl Entry { Entry::File(f) => f.encode().await?.boxed(), Entry::Directory(d) => d.encode(), Entry::Symlink(s) => stream::iter(Some(s.encode())).boxed(), + Entry::RawBlock(r) => stream::iter(Some(r.encode())).boxed(), }) } @@ -469,6 +503,7 @@ impl Entry { Entry::File(f) => f.wrap(), Entry::Directory(d) => d.wrap(), Entry::Symlink(s) => s.wrap(), + Entry::RawBlock(r) => r.wrap(), } } } @@ -535,6 +570,10 @@ impl DirectoryBuilder { self.entry(Entry::File(file)) } + pub fn add_raw_block(self, raw_block: RawBlock) -> Self { + self.entry(Entry::RawBlock(raw_block)) + } + pub fn add_symlink(self, symlink: Symlink) -> Self { self.entry(Entry::Symlink(symlink)) } @@ -560,10 +599,8 @@ impl DirectoryBuilder { Ok(if let Some(path) = path { let mut dir = make_dir_from_path(path, chunker.clone(), degree).await?; match &mut dir { - Directory::Basic(basic) => { - basic.entries.extend(entries) - } - Directory::Hamt(_) => unimplemented!() + Directory::Basic(basic) => basic.entries.extend(entries), + Directory::Hamt(_) => unimplemented!(), } if let Some(name) = name { dir.set_name(name); From 3fdea2fdea80fc9a5463332f1b6d042dbbe9ea94 Mon Sep 17 00:00:00 2001 From: pasha Date: Wed, 11 Jan 2023 16:10:59 +0100 Subject: [PATCH 12/17] feat: Refactor `DirectoryBuilder` --- iroh-unixfs/src/builder.rs | 147 +++++++++++++++++++------------------ 1 file changed, 77 insertions(+), 70 deletions(-) diff --git a/iroh-unixfs/src/builder.rs b/iroh-unixfs/src/builder.rs index c703991c5..ff339883c 100644 --- a/iroh-unixfs/src/builder.rs +++ b/iroh-unixfs/src/builder.rs @@ -467,7 +467,8 @@ impl Entry { let chunker = chunker_config.into(); let dir = DirectoryBuilder::new() .chunker(chunker) - .path(path) + .add_path(path) + .await? .build() .await?; Entry::Directory(dir) @@ -516,7 +517,6 @@ pub struct DirectoryBuilder { typ: DirectoryType, chunker: Chunker, degree: usize, - path: Option, } impl Default for DirectoryBuilder { @@ -527,7 +527,6 @@ impl Default for DirectoryBuilder { typ: DirectoryType::Basic, chunker: Chunker::Fixed(chunker::Fixed::default()), degree: DEFAULT_DEGREE, - path: None, } } } @@ -547,11 +546,6 @@ impl DirectoryBuilder { self } - pub fn path(mut self, path: &Path) -> Self { - self.path = Some(path.into()); - self - } - pub fn chunker(mut self, chunker: Chunker) -> Self { self.chunker = chunker; self @@ -563,22 +557,22 @@ impl DirectoryBuilder { } pub fn add_dir(self, dir: Directory) -> Result { - Ok(self.entry(Entry::Directory(dir))) + Ok(self.add_entry(Entry::Directory(dir))) } pub fn add_file(self, file: File) -> Self { - self.entry(Entry::File(file)) + self.add_entry(Entry::File(file)) } pub fn add_raw_block(self, raw_block: RawBlock) -> Self { - self.entry(Entry::RawBlock(raw_block)) + self.add_entry(Entry::RawBlock(raw_block)) } pub fn add_symlink(self, symlink: Symlink) -> Self { - self.entry(Entry::Symlink(symlink)) + self.add_entry(Entry::Symlink(symlink)) } - fn entry(mut self, entry: Entry) -> Self { + pub fn add_entry(mut self, entry: Entry) -> Self { if self.typ == DirectoryType::Basic && self.entries.len() >= DIRECTORY_LINK_LIMIT { self.typ = DirectoryType::Hamt } @@ -586,38 +580,38 @@ impl DirectoryBuilder { self } + pub fn add_entries(mut self, entries: impl Iterator) -> Self { + for entry in entries { + self = self.add_entry(entry); + } + self + } + + pub async fn add_path(self, path: impl Into) -> Result { + let chunker = self.chunker.clone(); + let degree = self.degree.clone(); + Ok(self.add_entries( + make_entries_from_path(path, chunker, degree) + .await? + .into_iter(), + )) + } + pub async fn build(self) -> Result { let DirectoryBuilder { - name, - entries, - typ, - path, - chunker, - degree, + name, entries, typ, .. } = self; - Ok(if let Some(path) = path { - let mut dir = make_dir_from_path(path, chunker.clone(), degree).await?; - match &mut dir { - Directory::Basic(basic) => basic.entries.extend(entries), - Directory::Hamt(_) => unimplemented!(), - } - if let Some(name) = name { - dir.set_name(name); - } - dir - } else { - let name = name.unwrap_or_default(); - match typ { - DirectoryType::Basic => Directory::Basic(BasicDirectory { name, entries }), - DirectoryType::Hamt => { - let hamt = HamtNode::new(entries) - .context("unable to build hamt. Probably a hash collision.")?; - Directory::Hamt(HamtDirectory { - name, - hamt: Box::new(hamt), - }) - } + let name = name.unwrap_or_default(); + Ok(match typ { + DirectoryType::Basic => Directory::Basic(BasicDirectory { name, entries }), + DirectoryType::Hamt => { + let hamt = HamtNode::new(entries) + .context("unable to build hamt. Probably a hash collision.")?; + Directory::Hamt(HamtDirectory { + name, + hamt: Box::new(hamt), + }) } }) } @@ -786,40 +780,47 @@ pub struct Config { } #[async_recursion(?Send)] -async fn make_dir_from_path>( +async fn make_entries_from_path>( path: P, chunker: Chunker, degree: usize, -) -> Result { - let path = path.into(); - let mut dir = DirectoryBuilder::new().name( - path.file_name() - .and_then(|s| s.to_str()) - .unwrap_or_default(), - ); - - let mut directory_reader = tokio::fs::read_dir(path.clone()).await?; +) -> Result> { + let mut directory_reader = tokio::fs::read_dir(path.into()).await?; + let mut entries = vec![]; while let Some(entry) = directory_reader.next_entry().await? { let path = entry.path(); if path.is_symlink() { - let s = SymlinkBuilder::new(path).build().await?; - dir = dir.add_symlink(s); + entries.push(Entry::Symlink(SymlinkBuilder::new(path).build().await?)) } else if path.is_file() { - let f = FileBuilder::new() - .chunker(chunker.clone()) - .degree(degree) - .path(path) - .build() - .await?; - dir = dir.add_file(f); + entries.push(Entry::File( + FileBuilder::new() + .chunker(chunker.clone()) + .degree(degree) + .path(path) + .build() + .await?, + )); } else if path.is_dir() { - let d = make_dir_from_path(path, chunker.clone(), degree).await?; - dir = dir.add_dir(d)?; + entries.push(Entry::Directory( + DirectoryBuilder::new() + .name( + path.file_name() + .and_then(|s| s.to_str()) + .unwrap_or_default(), + ) + .add_entries( + make_entries_from_path(path, chunker.clone(), degree) + .await? + .into_iter(), + ) + .build() + .await?, + )) } else { - anyhow::bail!("directory entry is neither file nor directory") + anyhow::bail!("directory entry is neither file nor directory nor symlink") } } - dir.build().await + Ok(entries) } #[cfg(test)] @@ -1119,12 +1120,18 @@ mod tests { vec![Entry::File(file), Entry::Directory(nested_dir)], ); - let got = make_dir_from_path( - dir, - Chunker::Fixed(chunker::Fixed::default()), - DEFAULT_DEGREE, - ) - .await?; + let got = DirectoryBuilder::new() + .add_entries( + make_entries_from_path( + dir, + Chunker::Fixed(chunker::Fixed::default()), + DEFAULT_DEGREE, + ) + .await? + .into_iter(), + ) + .build() + .await?; let basic_entries = |dir: Directory| match dir { Directory::Basic(basic) => basic.entries, From 09d568eb538bbbc91a661c909b2bd5af542dd33d Mon Sep 17 00:00:00 2001 From: pasha Date: Mon, 30 Jan 2023 20:59:09 +0100 Subject: [PATCH 13/17] fix: Do not stop if there is no p2p --- iroh-unixfs/src/content_loader.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/iroh-unixfs/src/content_loader.rs b/iroh-unixfs/src/content_loader.rs index 0399253c7..35a615353 100644 --- a/iroh-unixfs/src/content_loader.rs +++ b/iroh-unixfs/src/content_loader.rs @@ -226,10 +226,9 @@ impl FullLoader { #[async_trait] impl ContentLoader for FullLoader { async fn stop_session(&self, ctx: ContextId) -> Result<()> { - self.client - .try_p2p()? - .stop_session_bitswap(ctx.into()) - .await?; + if let Ok(p2p) = self.client.try_p2p() { + p2p.stop_session_bitswap(ctx.into()).await?; + } Ok(()) } From 9eebaeb37c5bc48655d76d1fe246c1b28c2b9348 Mon Sep 17 00:00:00 2001 From: pasha Date: Sat, 28 Jan 2023 22:14:36 +0100 Subject: [PATCH 14/17] fix: Remove async a little bit --- iroh-bitswap/src/lib.rs | 4 +- iroh-gateway/src/core.rs | 3 +- iroh-resolver/tests/roundtrip.rs | 8 ++-- iroh-resolver/tests/unixfs.rs | 3 +- iroh-share/src/lib.rs | 6 +-- iroh-share/src/sender.rs | 8 +--- iroh-unixfs/src/builder.rs | 69 ++++++++++++-------------------- 7 files changed, 37 insertions(+), 64 deletions(-) diff --git a/iroh-bitswap/src/lib.rs b/iroh-bitswap/src/lib.rs index ed814efda..04c2b1db0 100644 --- a/iroh-bitswap/src/lib.rs +++ b/iroh-bitswap/src/lib.rs @@ -161,7 +161,9 @@ impl Bitswap { let message = tokio::task::spawn_blocking(move || { message.verify_blocks(); message - }).await.expect("cannot spawn blocking thread"); + }) + .await + .expect("cannot spawn blocking thread"); if let Some(ref server) = server { futures::future::join( client.receive_message(&peer, &message), diff --git a/iroh-gateway/src/core.rs b/iroh-gateway/src/core.rs index 32b971721..d96861278 100644 --- a/iroh-gateway/src/core.rs +++ b/iroh-gateway/src/core.rs @@ -214,12 +214,11 @@ mod tests { .name(name) .content_bytes(content.to_vec()) .build() - .await .unwrap(); dir_builder = dir_builder.add_file(file); } - let root_dir = dir_builder.build().await.unwrap(); + let root_dir = dir_builder.build().unwrap(); let mut parts = root_dir.encode(); while let Some(part) = parts.next().await { let (cid, bytes, links) = part.unwrap().into_parts(); diff --git a/iroh-resolver/tests/roundtrip.rs b/iroh-resolver/tests/roundtrip.rs index b509bdd16..91191c535 100644 --- a/iroh-resolver/tests/roundtrip.rs +++ b/iroh-resolver/tests/roundtrip.rs @@ -37,8 +37,7 @@ async fn build_directory(name: &str, dir: &TestDir, hamt: bool) -> Result { @@ -47,7 +46,7 @@ async fn build_directory(name: &str, dir: &TestDir, hamt: bool) -> Result Result<()> { .chunker(param.chunker.clone()) .degree(param.degree) .content_bytes(data.clone()) - .build() - .await?; + .build()?; let stream = file.encode().await?; let (root, resolver) = stream_to_resolver(stream).await?; let out = resolver.resolve(Path::from_cid(root)).await?; diff --git a/iroh-share/src/lib.rs b/iroh-share/src/lib.rs index 720134553..756199168 100644 --- a/iroh-share/src/lib.rs +++ b/iroh-share/src/lib.rs @@ -139,8 +139,7 @@ mod tests { let file_1 = FileBuilder::new() .name("bar.txt") .content_bytes(&b"bar"[..]) - .build() - .await?; + .build()?; let mut bytes = vec![0u8; 5 * 1024 * 1024 - 8]; rand::thread_rng().fill_bytes(&mut bytes); @@ -149,8 +148,7 @@ mod tests { let file_2 = FileBuilder::new() .name("baz.txt") .content_reader(f) - .build() - .await?; + .build()?; let dir_builder = DirectoryBuilder::new() .name("foo") .add_file(file_1) diff --git a/iroh-share/src/sender.rs b/iroh-share/src/sender.rs index 5b6174961..03085e297 100644 --- a/iroh-share/src/sender.rs +++ b/iroh-share/src/sender.rs @@ -58,7 +58,7 @@ impl Sender { } = self; let t = Sha256Topic::new(format!("iroh-share-{id}")); - let root_dir = dir_builder.build().await?; + let root_dir = dir_builder.build()?; let (done_sender, done_receiver) = oneshot(); @@ -157,11 +157,7 @@ impl Sender { ) -> Result { let name = name.into(); // wrap in directory to preserve the name - let file = FileBuilder::new() - .name(name) - .content_bytes(data) - .build() - .await?; + let file = FileBuilder::new().name(name).content_bytes(data).build()?; let root_dir = DirectoryBuilder::new().add_file(file); self.transfer_from_dir_builder(root_dir).await diff --git a/iroh-unixfs/src/builder.rs b/iroh-unixfs/src/builder.rs index ff339883c..d027f36a6 100644 --- a/iroh-unixfs/src/builder.rs +++ b/iroh-unixfs/src/builder.rs @@ -395,7 +395,7 @@ impl FileBuilder { self } - pub async fn build(self) -> Result { + pub fn build(self) -> Result { let degree = self.degree; let chunker = self.chunker; let tree_builder = TreeBuilder::balanced_tree_with_degree(degree); @@ -469,8 +469,7 @@ impl Entry { .chunker(chunker) .add_path(path) .await? - .build() - .await?; + .build()?; Entry::Directory(dir) } else { anyhow::bail!("expected a ChunkerConfig in the Config"); @@ -478,11 +477,7 @@ impl Entry { } else if path.is_file() { if let Some(chunker_config) = config.chunker { let chunker = chunker_config.into(); - let file = FileBuilder::new() - .chunker(chunker) - .path(path) - .build() - .await?; + let file = FileBuilder::new().chunker(chunker).path(path).build()?; Entry::File(file) } else { anyhow::bail!("expected a ChunkerConfig in the Config"); @@ -597,7 +592,7 @@ impl DirectoryBuilder { )) } - pub async fn build(self) -> Result { + pub fn build(self) -> Result { let DirectoryBuilder { name, entries, typ, .. } = self; @@ -797,8 +792,7 @@ async fn make_entries_from_path>( .chunker(chunker.clone()) .degree(degree) .path(path) - .build() - .await?, + .build()?, )); } else if path.is_dir() { entries.push(Entry::Directory( @@ -813,8 +807,7 @@ async fn make_entries_from_path>( .await? .into_iter(), ) - .build() - .await?, + .build()?, )) } else { anyhow::bail!("directory entry is neither file nor directory nor symlink") @@ -839,14 +832,12 @@ mod tests { let bar = FileBuilder::new() .name("bar.txt") .content_bytes(b"bar".to_vec()) - .build() - .await?; + .build()?; let bar_encoded: Vec<_> = { let bar = FileBuilder::new() .name("bar.txt") .content_bytes(b"bar".to_vec()) - .build() - .await?; + .build()?; bar.encode().await?.try_collect().await? }; assert_eq!(bar_encoded.len(), 1); @@ -862,7 +853,7 @@ mod tests { baz.encode()? }; - let dir = dir.add_file(bar).add_symlink(baz).build().await?; + let dir = dir.add_file(bar).add_symlink(baz).build()?; let dir_block = dir.encode_root().await?; let decoded_dir = UnixfsNode::decode(dir_block.cid(), dir_block.data().clone())?; @@ -879,7 +870,7 @@ mod tests { #[tokio::test] async fn test_recursive_dir_builder() -> Result<()> { - let dir = DirectoryBuilder::new().build().await?; + let dir = DirectoryBuilder::new().build()?; DirectoryBuilder::new() .add_dir(dir) @@ -897,15 +888,13 @@ mod tests { let bar = FileBuilder::new() .name("bar.txt") .content_reader(bar_reader) - .build() - .await?; + .build()?; let bar_encoded: Vec<_> = { let bar_reader = std::io::Cursor::new(b"bar"); let bar = FileBuilder::new() .name("bar.txt") .content_reader(bar_reader) - .build() - .await?; + .build()?; bar.encode().await?.try_collect().await? }; assert_eq!(bar_encoded.len(), 1); @@ -921,7 +910,7 @@ mod tests { baz.encode()? }; - let dir = dir.add_file(bar).add_symlink(baz).build().await?; + let dir = dir.add_file(bar).add_symlink(baz).build()?; let dir_block = dir.encode_root().await?; let decoded_dir = UnixfsNode::decode(dir_block.cid(), dir_block.data().clone())?; @@ -960,15 +949,13 @@ mod tests { let bar = FileBuilder::new() .name("bar.txt") .content_reader(bar_reader) - .build() - .await?; + .build()?; let bar_encoded: Vec<_> = { let bar_reader = std::io::Cursor::new(vec![1u8; 1024 * 1024]); let bar = FileBuilder::new() .name("bar.txt") .content_reader(bar_reader) - .build() - .await?; + .build()?; bar.encode().await?.try_collect().await? }; assert_eq!(bar_encoded.len(), 5); @@ -985,20 +972,18 @@ mod tests { let baz = FileBuilder::new() .name("baz.txt") .content_reader(baz_reader) - .build() - .await?; + .build()?; let baz_encoded: Vec<_> = { let baz_reader = std::io::Cursor::new(baz_content); let baz = FileBuilder::new() .name("baz.txt") .content_reader(baz_reader) - .build() - .await?; + .build()?; baz.encode().await?.try_collect().await? }; assert_eq!(baz_encoded.len(), 9); - let dir = dir.add_file(bar).add_file(baz).build().await?; + let dir = dir.add_file(bar).add_file(baz).build()?; let dir_block = dir.encode_root().await?; let decoded_dir = UnixfsNode::decode(dir_block.cid(), dir_block.data().clone())?; @@ -1039,8 +1024,7 @@ mod tests { let file = FileBuilder::new() .name("foo.txt") .content_bytes(Bytes::from("hello world")) - .build() - .await?; + .build()?; builder = builder.add_file(file); } @@ -1050,8 +1034,7 @@ mod tests { let file = FileBuilder::new() .name("foo.txt") .content_bytes(Bytes::from("hello world")) - .build() - .await?; + .build()?; builder = builder.add_file(file); // at directory link limit should be processed as a hamt @@ -1067,11 +1050,10 @@ mod tests { let file = FileBuilder::new() .name("foo.txt") .content_bytes(Bytes::from("hello world")) - .build() - .await?; + .build()?; builder = builder.add_file(file); } - assert!(builder.build().await.is_err()); + assert!(builder.build().is_err()); Ok(()) } @@ -1102,7 +1084,7 @@ mod tests { file.write_all(b"hello world").unwrap(); // create directory manually - let nested_file = FileBuilder::new().path(nested_file_path).build().await?; + let nested_file = FileBuilder::new().path(nested_file_path).build()?; let nested_dir = Directory::single( String::from( nested_dir_path @@ -1113,7 +1095,7 @@ mod tests { Entry::File(nested_file), ); - let file = FileBuilder::new().path(file_path).build().await?; + let file = FileBuilder::new().path(file_path).build()?; let expected = Directory::basic( String::from(dir.clone().file_name().and_then(|s| s.to_str()).unwrap()), @@ -1130,8 +1112,7 @@ mod tests { .await? .into_iter(), ) - .build() - .await?; + .build()?; let basic_entries = |dir: Directory| match dir { Directory::Basic(basic) => basic.entries, From e8d64d8e7b6524fe1df89b6e05be54ef607b749e Mon Sep 17 00:00:00 2001 From: pasha Date: Mon, 30 Jan 2023 07:35:37 +0100 Subject: [PATCH 15/17] Make `TreeNode` public --- iroh-api/src/api.rs | 2 +- iroh-api/src/store.rs | 2 +- iroh-unixfs/src/balanced_tree.rs | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/iroh-api/src/api.rs b/iroh-api/src/api.rs index 3d7402ab6..444f08c58 100644 --- a/iroh-api/src/api.rs +++ b/iroh-api/src/api.rs @@ -192,7 +192,7 @@ impl Api { }; Ok(Box::pin( - add_blocks_to_store(Some(self.client.clone()), blocks).await, + add_blocks_to_store(Some(self.client.clone()), blocks), )) } diff --git a/iroh-api/src/store.rs b/iroh-api/src/store.rs index 921f39e1a..96e471f7a 100644 --- a/iroh-api/src/store.rs +++ b/iroh-api/src/store.rs @@ -85,7 +85,7 @@ fn add_blocks_to_store_chunked( } } -pub async fn add_blocks_to_store( +pub fn add_blocks_to_store( store: Option, blocks: Pin> + Send>>, ) -> impl Stream> { diff --git a/iroh-unixfs/src/balanced_tree.rs b/iroh-unixfs/src/balanced_tree.rs index 9b11584ad..8af22a1d8 100644 --- a/iroh-unixfs/src/balanced_tree.rs +++ b/iroh-unixfs/src/balanced_tree.rs @@ -94,7 +94,7 @@ fn stream_balanced_tree( // check if the leaf node of the tree is full if tree[0].len() == degree { // if so, iterate through nodes - for i in 0..tree_len { + for i in 1..tree_len { // if we encounter any nodes that are not full, break if tree[i].len() < degree { break; @@ -115,13 +115,13 @@ fn stream_balanced_tree( // add link_info to parent node tree[i+1].push((cid, link_info)); } - // at this point the tree will be able to recieve new links + // at this point the tree will be able to receive new links // without "overflowing", aka the leaf node and stem nodes // have fewer than `degree` number of links } // now that we know the tree is in a "healthy" state to - // recieve more links, add the link to the tree + // receive more links, add the link to the tree tree[0].push((*block.cid(), link_info)); yield block; // at this point, the leaf node may have `degree` number of @@ -134,7 +134,7 @@ fn stream_balanced_tree( } // clean up, aka yield the rest of the stem nodes - // since all the stem nodes are able to recieve links + // since all the stem nodes are able to receive links // we don't have to worry about "overflow" while let Some(links) = tree.pop_front() { let (block, link_info) = TreeNode::Stem(links).encode()?; @@ -191,7 +191,7 @@ fn create_unixfs_node_from_links(links: Vec<(Cid, LinkInfo)>) -> Result), } From 430f3dd53fc057d046311a0d98d0737349da0a93 Mon Sep 17 00:00:00 2001 From: pasha Date: Mon, 30 Jan 2023 08:20:53 +0100 Subject: [PATCH 16/17] feat: Expose multihash::Code in some API --- iroh-api/src/api.rs | 12 +++-- iroh-gateway/src/core.rs | 3 +- iroh-resolver/tests/roundtrip.rs | 5 +- iroh-share/src/sender.rs | 3 +- iroh-unixfs/src/balanced_tree.rs | 92 +++++++++++++++++++++++--------- iroh-unixfs/src/builder.rs | 66 +++++++++++++---------- iroh-unixfs/src/unixfs.rs | 13 ++--- 7 files changed, 124 insertions(+), 70 deletions(-) diff --git a/iroh-api/src/api.rs b/iroh-api/src/api.rs index 444f08c58..2036ad547 100644 --- a/iroh-api/src/api.rs +++ b/iroh-api/src/api.rs @@ -11,6 +11,7 @@ use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use iroh_resolver::resolver::Resolver; use iroh_rpc_client::{Client, ClientStatus}; +use iroh_unixfs::balanced_tree::DEFAULT_CODE; use iroh_unixfs::{ builder::Entry as UnixfsEntry, content_loader::{FullLoader, FullLoaderConfig}, @@ -182,18 +183,19 @@ impl Api { ) -> Result>> { let blocks = match entry { UnixfsEntry::File(f) => f.encode().await?.boxed(), - UnixfsEntry::Directory(d) => d.encode(), + UnixfsEntry::Directory(d) => d.encode(&DEFAULT_CODE), UnixfsEntry::Symlink(s) => Box::pin(async_stream::try_stream! { - yield s.encode()? + yield s.encode(&DEFAULT_CODE)? }), UnixfsEntry::RawBlock(r) => Box::pin(async_stream::try_stream! { yield r.encode()? }), }; - Ok(Box::pin( - add_blocks_to_store(Some(self.client.clone()), blocks), - )) + Ok(Box::pin(add_blocks_to_store( + Some(self.client.clone()), + blocks, + ))) } /// The `add` method encodes the entry into a DAG and adds the resulting diff --git a/iroh-gateway/src/core.rs b/iroh-gateway/src/core.rs index d96861278..bc28c99da 100644 --- a/iroh-gateway/src/core.rs +++ b/iroh-gateway/src/core.rs @@ -130,6 +130,7 @@ mod tests { use iroh_rpc_client::Config as RpcClientConfig; use iroh_rpc_types::store::StoreAddr; use iroh_rpc_types::Addr; + use iroh_unixfs::balanced_tree::DEFAULT_CODE; use iroh_unixfs::builder::{DirectoryBuilder, FileBuilder}; use iroh_unixfs::content_loader::{FullLoader, FullLoaderConfig}; use iroh_unixfs::unixfs::UnixfsNode; @@ -219,7 +220,7 @@ mod tests { } let root_dir = dir_builder.build().unwrap(); - let mut parts = root_dir.encode(); + let mut parts = root_dir.encode(&DEFAULT_CODE); while let Some(part) = parts.next().await { let (cid, bytes, links) = part.unwrap().into_parts(); cids.push(cid); diff --git a/iroh-resolver/tests/roundtrip.rs b/iroh-resolver/tests/roundtrip.rs index 91191c535..f8e34a0dc 100644 --- a/iroh-resolver/tests/roundtrip.rs +++ b/iroh-resolver/tests/roundtrip.rs @@ -16,6 +16,7 @@ use std::collections::BTreeMap; use tokio::io::AsyncReadExt; use iroh_resolver::resolver::{read_to_vec, stream_to_resolver, Out, Resolver}; +use iroh_unixfs::balanced_tree::DEFAULT_CODE; #[derive(Debug, Clone, PartialEq, Eq)] enum TestDirEntry { @@ -109,7 +110,7 @@ async fn build_testdir( /// a roundtrip test that converts a dir to an unixfs DAG and back async fn dir_roundtrip_test(dir: TestDir, hamt: bool) -> Result { let directory = build_directory("", &dir, hamt).await?; - let stream = directory.encode(); + let stream = directory.encode(&DEFAULT_CODE); let (root, resolver) = stream_to_resolver(stream).await?; let stream = resolver.resolve_recursive_with_paths(iroh_resolver::resolver::Path::from_cid(root)); @@ -154,7 +155,7 @@ async fn symlink_roundtrip_test() -> Result<()> { let target = "../../bar.txt"; builder.target(target); let sym = builder.build().await?; - let block = sym.encode()?; + let block = sym.encode(&DEFAULT_CODE)?; let stream = async_stream::try_stream! { yield block; }; diff --git a/iroh-share/src/sender.rs b/iroh-share/src/sender.rs index 03085e297..ef38a93d7 100644 --- a/iroh-share/src/sender.rs +++ b/iroh-share/src/sender.rs @@ -5,6 +5,7 @@ use bytes::Bytes; use futures::channel::oneshot::{channel as oneshot, Receiver as OneShotReceiver}; use futures::StreamExt; use iroh_p2p::{GossipsubEvent, NetworkEvent}; +use iroh_unixfs::balanced_tree::DEFAULT_CODE; use iroh_unixfs::builder::{DirectoryBuilder, FileBuilder}; use libp2p::gossipsub::Sha256Topic; use rand::Rng; @@ -65,7 +66,7 @@ impl Sender { let p2p_rpc = p2p.rpc().try_p2p()?; let store = p2p.rpc().try_store()?; let (root, num_parts) = { - let parts = root_dir.encode(); + let parts = root_dir.encode(&DEFAULT_CODE); tokio::pin!(parts); let mut num_parts = 0; let mut root_cid = None; diff --git a/iroh-unixfs/src/balanced_tree.rs b/iroh-unixfs/src/balanced_tree.rs index 8af22a1d8..d2efab10b 100644 --- a/iroh-unixfs/src/balanced_tree.rs +++ b/iroh-unixfs/src/balanced_tree.rs @@ -13,22 +13,26 @@ use crate::unixfs::{dag_pb, unixfs_pb, DataType, Node, UnixfsNode}; /// Default degree number for balanced tree, taken from unixfs specs /// pub const DEFAULT_DEGREE: usize = 174; +pub const DEFAULT_CODE: multihash::Code = multihash::Code::Sha2_256; #[derive(Debug, PartialEq, Eq)] pub enum TreeBuilder { /// TreeBuilder that builds a "balanced tree" with a max degree size of /// degree - Balanced { degree: usize }, + Balanced { + degree: usize, + code: multihash::Code, + }, } impl TreeBuilder { pub fn balanced_tree() -> Self { - Self::balanced_tree_with_degree(DEFAULT_DEGREE) + Self::balanced_tree_with_degree_and_code(DEFAULT_DEGREE, DEFAULT_CODE) } - pub fn balanced_tree_with_degree(degree: usize) -> Self { + pub fn balanced_tree_with_degree_and_code(degree: usize, code: multihash::Code) -> Self { assert!(degree > 1); - TreeBuilder::Balanced { degree } + TreeBuilder::Balanced { degree, code } } pub fn stream_tree( @@ -36,20 +40,32 @@ impl TreeBuilder { chunks: impl Stream> + Send, ) -> impl Stream> { match self { - TreeBuilder::Balanced { degree } => stream_balanced_tree(chunks, *degree), + TreeBuilder::Balanced { degree, code } => { + stream_balanced_tree(chunks, *degree, code.clone()) + } } } } #[derive(Clone, Debug, PartialEq)] -struct LinkInfo { - raw_data_len: u64, - encoded_len: u64, +pub struct LinkInfo { + pub raw_data_len: u64, + pub encoded_len: u64, +} + +impl LinkInfo { + pub fn new(raw_data_len: u64, encoded_len: u64) -> LinkInfo{ + LinkInfo { + raw_data_len, + encoded_len + } + } } fn stream_balanced_tree( in_stream: impl Stream> + Send, degree: usize, + code: multihash::Code, ) -> impl Stream> { try_stream! { // degree = 8 @@ -80,8 +96,9 @@ fn stream_balanced_tree( let hash_par: usize = 8; let in_stream = in_stream.err_into::().map(|chunk| { - tokio::task::spawn_blocking(|| { - chunk.and_then(|chunk| TreeNode::Leaf(chunk).encode()) + let code = code.clone(); + tokio::task::spawn_blocking(move || { + chunk.and_then(|chunk| TreeNode::Leaf(chunk).encode(&code)) }).err_into::() }).buffered(hash_par).map(|x| x.and_then(|x| x)); @@ -108,7 +125,7 @@ fn stream_balanced_tree( // create node, keeping the cid let links = std::mem::replace(&mut tree[i], Vec::with_capacity(degree)); - let (block, link_info) = TreeNode::Stem(links).encode()?; + let (block, link_info) = TreeNode::Stem(links).encode(&code)?; let cid = *block.cid(); yield block; @@ -137,7 +154,7 @@ fn stream_balanced_tree( // since all the stem nodes are able to receive links // we don't have to worry about "overflow" while let Some(links) = tree.pop_front() { - let (block, link_info) = TreeNode::Stem(links).encode()?; + let (block, link_info) = TreeNode::Stem(links).encode(&code)?; let cid = *block.cid(); yield block; @@ -191,18 +208,19 @@ fn create_unixfs_node_from_links(links: Vec<(Cid, LinkInfo)>) -> Result), } impl TreeNode { - fn encode(self) -> Result<(Block, LinkInfo)> { + pub fn encode(self, code: &multihash::Code) -> Result<(Block, LinkInfo)> { match self { TreeNode::Leaf(bytes) => { let len = bytes.len(); let node = UnixfsNode::Raw(bytes); - let block = node.encode()?; + let block = node.encode(code)?; let link_info = LinkInfo { // in a leaf the raw data len and encoded len are the same since our leaf // nodes are raw unixfs nodes @@ -214,7 +232,7 @@ impl TreeNode { TreeNode::Stem(links) => { let mut encoded_len: u64 = links.iter().map(|(_, l)| l.encoded_len).sum(); let node = create_unixfs_node_from_links(links)?; - let block = node.encode()?; + let block = node.encode(code)?; encoded_len += block.data().len() as u64; let raw_data_len = node .filesize() @@ -253,7 +271,7 @@ mod tests { if num_chunks / degree == 0 { let chunk = chunks.next().await.unwrap().unwrap(); let leaf = TreeNode::Leaf(chunk); - let (block, _) = leaf.encode().unwrap(); + let (block, _) = leaf.encode(&multihash::Code::Sha2_256).unwrap(); tree[0].push(block); return tree; } @@ -261,7 +279,7 @@ mod tests { while let Some(chunk) = chunks.next().await { let chunk = chunk.unwrap(); let leaf = TreeNode::Leaf(chunk); - let (block, link_info) = leaf.encode().unwrap(); + let (block, link_info) = leaf.encode(&multihash::Code::Sha2_256).unwrap(); links[0].push((*block.cid(), link_info)); tree[0].push(block); } @@ -273,7 +291,7 @@ mod tests { let mut links_layer = Vec::with_capacity(count); for links in prev_layer.chunks(degree) { let stem = TreeNode::Stem(links.to_vec()); - let (block, link_info) = stem.encode().unwrap(); + let (block, link_info) = stem.encode(&multihash::Code::Sha2_256).unwrap(); links_layer.push((*block.cid(), link_info)); tree_layer.push(block); } @@ -337,12 +355,14 @@ mod tests { fn make_leaf(data: usize) -> (Block, LinkInfo) { TreeNode::Leaf(BytesMut::from(&data.to_be_bytes()[..]).freeze()) - .encode() + .encode(&multihash::Code::Sha2_256) .unwrap() } fn make_stem(links: Vec<(Cid, LinkInfo)>) -> (Block, LinkInfo) { - TreeNode::Stem(links).encode().unwrap() + TreeNode::Stem(links) + .encode(&multihash::Code::Sha2_256) + .unwrap() } #[tokio::test] @@ -450,7 +470,7 @@ mod tests { async fn balanced_tree_test_leaf() { let num_chunks = 1; let expect = build_expect(num_chunks, 3).await; - let got = stream_balanced_tree(test_chunk_stream(1), 3); + let got = stream_balanced_tree(test_chunk_stream(1), 3, multihash::Code::Sha2_256); tokio::pin!(got); ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await; } @@ -460,7 +480,11 @@ mod tests { let num_chunks = 3; let degrees = 3; let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + let got = stream_balanced_tree( + test_chunk_stream(num_chunks), + degrees, + multihash::Code::Sha2_256, + ); tokio::pin!(got); ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await; } @@ -470,7 +494,11 @@ mod tests { let degrees = 3; let num_chunks = 9; let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + let got = stream_balanced_tree( + test_chunk_stream(num_chunks), + degrees, + multihash::Code::Sha2_256, + ); tokio::pin!(got); ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await; } @@ -480,7 +508,11 @@ mod tests { let degrees = 3; let num_chunks = 10; let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + let got = stream_balanced_tree( + test_chunk_stream(num_chunks), + degrees, + multihash::Code::Sha2_256, + ); tokio::pin!(got); ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await; } @@ -490,7 +522,11 @@ mod tests { let num_chunks = 125; let degrees = 5; let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + let got = stream_balanced_tree( + test_chunk_stream(num_chunks), + degrees, + multihash::Code::Sha2_256, + ); tokio::pin!(got); ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await; } @@ -500,7 +536,11 @@ mod tests { let num_chunks = 780; let degrees = 11; let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + let got = stream_balanced_tree( + test_chunk_stream(num_chunks), + degrees, + multihash::Code::Sha2_256, + ); tokio::pin!(got); ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await; } diff --git a/iroh-unixfs/src/builder.rs b/iroh-unixfs/src/builder.rs index d027f36a6..f636ea511 100644 --- a/iroh-unixfs/src/builder.rs +++ b/iroh-unixfs/src/builder.rs @@ -15,6 +15,7 @@ use futures::{ use prost::Message; use tokio::io::AsyncRead; +use crate::balanced_tree::DEFAULT_CODE; use crate::{ balanced_tree::{TreeBuilder, DEFAULT_DEGREE}, chunker::{self, Chunker, ChunkerConfig, DEFAULT_CHUNK_SIZE_LIMIT}, @@ -92,9 +93,9 @@ impl Directory { Directory::single("".into(), Entry::Directory(self)) } - pub async fn encode_root(self) -> Result { + pub async fn encode_root(self, code: &multihash::Code) -> Result { let mut current = None; - let parts = self.encode(); + let parts = self.encode(code); tokio::pin!(parts); while let Some(part) = parts.next().await { @@ -104,33 +105,36 @@ impl Directory { current.expect("must not be empty") } - pub fn encode<'a>(self) -> BoxStream<'a, Result> { + pub fn encode<'a>(self, code: &multihash::Code) -> BoxStream<'a, Result> { match self { - Directory::Basic(basic) => basic.encode(), - Directory::Hamt(hamt) => hamt.encode(), + Directory::Basic(basic) => basic.encode(code), + Directory::Hamt(hamt) => hamt.encode(code), } } } impl BasicDirectory { - pub fn encode<'a>(self) -> BoxStream<'a, Result> { + pub fn encode<'a>(self, code: &multihash::Code) -> BoxStream<'a, Result> { + let code = code.clone(); async_stream::try_stream! { let mut links = Vec::new(); for entry in self.entries { let name = entry.name().to_string(); - let parts = entry.encode().await?; + let parts = entry.encode(&code).await?; tokio::pin!(parts); let mut root = None; + let mut size = 0u64; while let Some(part) = parts.next().await { let block = part?; root = Some(block.clone()); + size += block.data().len() as u64; yield block; } let root_block = root.expect("file must not be empty"); links.push(dag_pb::PbLink { hash: Some(root_block.cid().to_bytes()), name: Some(name), - tsize: Some(root_block.data().len() as u64), + tsize: Some(size), }); } @@ -141,15 +145,15 @@ impl BasicDirectory { }; let outer = encode_unixfs_pb(&inner, links)?; let node = UnixfsNode::Directory(Node { outer, inner }); - yield node.encode()?; + yield node.encode(&code)?; } .boxed() } } impl HamtDirectory { - pub fn encode<'a>(self) -> BoxStream<'a, Result> { - self.hamt.encode() + pub fn encode<'a>(self, code: &multihash::Code) -> BoxStream<'a, Result> { + self.hamt.encode(code) } } @@ -260,7 +264,7 @@ impl Symlink { &self.name } - pub fn encode(self) -> Result { + pub fn encode(self, code: &multihash::Code) -> Result { let target = self .target .to_str() @@ -273,7 +277,7 @@ impl Symlink { }; let outer = encode_unixfs_pb(&inner, Vec::new())?; let node = UnixfsNode::Symlink(Node { outer, inner }); - node.encode() + node.encode(code) } } @@ -315,6 +319,7 @@ pub struct FileBuilder { reader: Option>>, chunker: Chunker, degree: usize, + code: multihash::Code, } impl Default for FileBuilder { @@ -325,6 +330,7 @@ impl Default for FileBuilder { reader: None, chunker: Chunker::Fixed(chunker::Fixed::default()), degree: DEFAULT_DEGREE, + code: DEFAULT_CODE, } } } @@ -384,6 +390,11 @@ impl FileBuilder { self } + pub fn code(mut self, code: multihash::Code) -> Self { + self.code = code; + self + } + pub fn content_bytes>(mut self, content: B) -> Self { let bytes = content.into(); self.reader = Some(Box::pin(std::io::Cursor::new(bytes))); @@ -398,7 +409,7 @@ impl FileBuilder { pub fn build(self) -> Result { let degree = self.degree; let chunker = self.chunker; - let tree_builder = TreeBuilder::balanced_tree_with_degree(degree); + let tree_builder = TreeBuilder::balanced_tree_with_degree_and_code(degree, self.code); if let Some(path) = self.path { let name = match self.name { Some(n) => n, @@ -452,11 +463,11 @@ impl Entry { } } - pub async fn encode(self) -> Result>> { + pub async fn encode(self, code: &multihash::Code) -> Result>> { Ok(match self { Entry::File(f) => f.encode().await?.boxed(), - Entry::Directory(d) => d.encode(), - Entry::Symlink(s) => stream::iter(Some(s.encode())).boxed(), + Entry::Directory(d) => d.encode(code), + Entry::Symlink(s) => stream::iter(Some(s.encode(code))).boxed(), Entry::RawBlock(r) => stream::iter(Some(r.encode())).boxed(), }) } @@ -669,7 +680,8 @@ impl HamtNode { } } - pub fn encode<'a>(self) -> BoxStream<'a, Result> { + pub fn encode<'a>(self, code: &multihash::Code) -> BoxStream<'a, Result> { + let code = code.clone(); match self { Self::Branch(tree) => { async_stream::try_stream! { @@ -678,7 +690,7 @@ impl HamtNode { for (prefix, node) in tree { let name = format!("{:02X}{}", prefix, node.name()); bitfield.set_bit(prefix); - let blocks = node.encode(); + let blocks = node.encode(&code); let mut root = None; tokio::pin!(blocks); while let Some(block) = blocks.next().await { @@ -703,11 +715,11 @@ impl HamtNode { // it does not really matter what enum variant we choose here as long as // it is not raw. The type of the node will be HamtShard from above. let node = UnixfsNode::Directory(crate::unixfs::Node { outer, inner }); - yield node.encode()?; + yield node.encode(&code)?; } .boxed() } - Self::Leaf(HamtLeaf(_hash, entry)) => async move { entry.encode().await } + Self::Leaf(HamtLeaf(_hash, entry)) => async move { entry.encode(&code).await } .try_flatten_stream() .boxed(), } @@ -749,7 +761,7 @@ impl SymlinkBuilder { } } -pub(crate) fn encode_unixfs_pb( +pub fn encode_unixfs_pb( inner: &unixfs_pb::Data, links: Vec, ) -> Result { @@ -850,12 +862,12 @@ mod tests { let mut baz = SymlinkBuilder::new("baz.txt"); baz.target("bat.txt"); let baz = baz.build().await?; - baz.encode()? + baz.encode(&multihash::Code::Sha2_256)? }; let dir = dir.add_file(bar).add_symlink(baz).build()?; - let dir_block = dir.encode_root().await?; + let dir_block = dir.encode_root(&multihash::Code::Sha2_256).await?; let decoded_dir = UnixfsNode::decode(dir_block.cid(), dir_block.data().clone())?; let links = decoded_dir.links().collect::>>().unwrap(); @@ -907,12 +919,12 @@ mod tests { let mut baz = SymlinkBuilder::new("baz.txt"); baz.target("bat.txt"); let baz = baz.build().await?; - baz.encode()? + baz.encode(&multihash::Code::Sha2_256)? }; let dir = dir.add_file(bar).add_symlink(baz).build()?; - let dir_block = dir.encode_root().await?; + let dir_block = dir.encode_root(&multihash::Code::Sha2_256).await?; let decoded_dir = UnixfsNode::decode(dir_block.cid(), dir_block.data().clone())?; let links = decoded_dir.links().collect::>>().unwrap(); @@ -985,7 +997,7 @@ mod tests { let dir = dir.add_file(bar).add_file(baz).build()?; - let dir_block = dir.encode_root().await?; + let dir_block = dir.encode_root(&multihash::Code::Sha2_256).await?; let decoded_dir = UnixfsNode::decode(dir_block.cid(), dir_block.data().clone())?; let links = decoded_dir.links().collect::>>().unwrap(); diff --git a/iroh-unixfs/src/unixfs.rs b/iroh-unixfs/src/unixfs.rs index 4fe8553e0..275802e9c 100644 --- a/iroh-unixfs/src/unixfs.rs +++ b/iroh-unixfs/src/unixfs.rs @@ -21,12 +21,12 @@ use crate::{ types::{Block, Link, LinkRef, Links, PbLinks}, }; -pub(crate) mod unixfs_pb { +pub mod unixfs_pb { #![allow(clippy::all)] include!(concat!(env!("OUT_DIR"), "/unixfs_pb.rs")); } -pub(crate) mod dag_pb { +pub mod dag_pb { #![allow(clippy::all)] include!(concat!(env!("OUT_DIR"), "/merkledag_pb.rs")); } @@ -179,12 +179,12 @@ impl UnixfsNode { } } - pub fn encode(&self) -> Result { + pub fn encode(&self, code: &multihash::Code) -> Result { let res = match self { UnixfsNode::Raw(data) => { let out = data.clone(); let links = vec![]; - let cid = Cid::new_v1(Codec::Raw as _, cid::multihash::Code::Sha2_256.digest(&out)); + let cid = Cid::new_v1(Codec::Raw as _, code.digest(&out)); Block::new(cid, out, links) } UnixfsNode::RawNode(node) @@ -197,10 +197,7 @@ impl UnixfsNode { .links() .map(|x| Ok(x?.cid)) .collect::>>()?; - let cid = Cid::new_v1( - Codec::DagPb as _, - cid::multihash::Code::Sha2_256.digest(&out), - ); + let cid = Cid::new_v1(Codec::DagPb as _, code.digest(&out)); Block::new(cid, out, links) } }; From f6f44b39dddd93784e39ad06aafb06a60d1b8dc2 Mon Sep 17 00:00:00 2001 From: pasha Date: Thu, 23 Feb 2023 21:40:23 +0100 Subject: [PATCH 17/17] fix: clippy --- iroh-api/src/api.rs | 4 ++-- iroh-bitswap/src/lib.rs | 14 +++--------- iroh-gateway/src/client.rs | 5 ++--- iroh-gateway/src/core.rs | 2 +- iroh-resolver/tests/roundtrip.rs | 4 ++-- iroh-share/src/sender.rs | 2 +- iroh-unixfs/src/balanced_tree.rs | 26 ++++++++++------------ iroh-unixfs/src/builder.rs | 38 +++++++++++++++----------------- iroh-unixfs/src/unixfs.rs | 2 +- 9 files changed, 42 insertions(+), 55 deletions(-) diff --git a/iroh-api/src/api.rs b/iroh-api/src/api.rs index 2036ad547..76b504638 100644 --- a/iroh-api/src/api.rs +++ b/iroh-api/src/api.rs @@ -183,9 +183,9 @@ impl Api { ) -> Result>> { let blocks = match entry { UnixfsEntry::File(f) => f.encode().await?.boxed(), - UnixfsEntry::Directory(d) => d.encode(&DEFAULT_CODE), + UnixfsEntry::Directory(d) => d.encode(DEFAULT_CODE), UnixfsEntry::Symlink(s) => Box::pin(async_stream::try_stream! { - yield s.encode(&DEFAULT_CODE)? + yield s.encode(DEFAULT_CODE)? }), UnixfsEntry::RawBlock(r) => Box::pin(async_stream::try_stream! { yield r.encode()? diff --git a/iroh-bitswap/src/lib.rs b/iroh-bitswap/src/lib.rs index 04c2b1db0..f58fc056f 100644 --- a/iroh-bitswap/src/lib.rs +++ b/iroh-bitswap/src/lib.rs @@ -77,21 +77,16 @@ pub struct Bitswap { _workers: Arc>>, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] enum PeerState { Connected(ConnectionId), Responsive(ConnectionId, ProtocolId), Unresponsive, + #[default] Disconnected, DialFailure(Instant), } -impl Default for PeerState { - fn default() -> Self { - PeerState::Disconnected - } -} - impl PeerState { fn is_connected(self) -> bool { matches!(self, PeerState::Connected(_) | PeerState::Responsive(_, _)) @@ -496,10 +491,7 @@ impl NetworkBehaviour for Bitswap { } } } - HandlerEvent::Message { - mut message, - protocol, - } => { + HandlerEvent::Message { message, protocol } => { self.set_peer_state(&peer_id, PeerState::Responsive(connection, protocol)); self.receive_message(peer_id, message); } diff --git a/iroh-gateway/src/client.rs b/iroh-gateway/src/client.rs index c1449438e..d5fcba74b 100644 --- a/iroh-gateway/src/client.rs +++ b/iroh-gateway/src/client.rs @@ -140,11 +140,10 @@ impl Client { ) -> Result { info!("retrieve path metadata {}", path); if raw_format { - return self - .resolver + self.resolver .resolve_raw(path) .await - .map_err(|e| e.to_string()); + .map_err(|e| e.to_string()) } else { self.resolver.resolve(path).await.map_err(|e| e.to_string()) } diff --git a/iroh-gateway/src/core.rs b/iroh-gateway/src/core.rs index bc28c99da..55fc9f76f 100644 --- a/iroh-gateway/src/core.rs +++ b/iroh-gateway/src/core.rs @@ -220,7 +220,7 @@ mod tests { } let root_dir = dir_builder.build().unwrap(); - let mut parts = root_dir.encode(&DEFAULT_CODE); + let mut parts = root_dir.encode(DEFAULT_CODE); while let Some(part) = parts.next().await { let (cid, bytes, links) = part.unwrap().into_parts(); cids.push(cid); diff --git a/iroh-resolver/tests/roundtrip.rs b/iroh-resolver/tests/roundtrip.rs index f8e34a0dc..170782efd 100644 --- a/iroh-resolver/tests/roundtrip.rs +++ b/iroh-resolver/tests/roundtrip.rs @@ -110,7 +110,7 @@ async fn build_testdir( /// a roundtrip test that converts a dir to an unixfs DAG and back async fn dir_roundtrip_test(dir: TestDir, hamt: bool) -> Result { let directory = build_directory("", &dir, hamt).await?; - let stream = directory.encode(&DEFAULT_CODE); + let stream = directory.encode(DEFAULT_CODE); let (root, resolver) = stream_to_resolver(stream).await?; let stream = resolver.resolve_recursive_with_paths(iroh_resolver::resolver::Path::from_cid(root)); @@ -155,7 +155,7 @@ async fn symlink_roundtrip_test() -> Result<()> { let target = "../../bar.txt"; builder.target(target); let sym = builder.build().await?; - let block = sym.encode(&DEFAULT_CODE)?; + let block = sym.encode(DEFAULT_CODE)?; let stream = async_stream::try_stream! { yield block; }; diff --git a/iroh-share/src/sender.rs b/iroh-share/src/sender.rs index ef38a93d7..019ca3167 100644 --- a/iroh-share/src/sender.rs +++ b/iroh-share/src/sender.rs @@ -66,7 +66,7 @@ impl Sender { let p2p_rpc = p2p.rpc().try_p2p()?; let store = p2p.rpc().try_store()?; let (root, num_parts) = { - let parts = root_dir.encode(&DEFAULT_CODE); + let parts = root_dir.encode(DEFAULT_CODE); tokio::pin!(parts); let mut num_parts = 0; let mut root_cid = None; diff --git a/iroh-unixfs/src/balanced_tree.rs b/iroh-unixfs/src/balanced_tree.rs index d2efab10b..e62efd6b3 100644 --- a/iroh-unixfs/src/balanced_tree.rs +++ b/iroh-unixfs/src/balanced_tree.rs @@ -40,9 +40,7 @@ impl TreeBuilder { chunks: impl Stream> + Send, ) -> impl Stream> { match self { - TreeBuilder::Balanced { degree, code } => { - stream_balanced_tree(chunks, *degree, code.clone()) - } + TreeBuilder::Balanced { degree, code } => stream_balanced_tree(chunks, *degree, *code), } } } @@ -54,10 +52,10 @@ pub struct LinkInfo { } impl LinkInfo { - pub fn new(raw_data_len: u64, encoded_len: u64) -> LinkInfo{ + pub fn new(raw_data_len: u64, encoded_len: u64) -> LinkInfo { LinkInfo { raw_data_len, - encoded_len + encoded_len, } } } @@ -98,7 +96,7 @@ fn stream_balanced_tree( let in_stream = in_stream.err_into::().map(|chunk| { let code = code.clone(); tokio::task::spawn_blocking(move || { - chunk.and_then(|chunk| TreeNode::Leaf(chunk).encode(&code)) + chunk.and_then(|chunk| TreeNode::Leaf(chunk).encode(code)) }).err_into::() }).buffered(hash_par).map(|x| x.and_then(|x| x)); @@ -125,7 +123,7 @@ fn stream_balanced_tree( // create node, keeping the cid let links = std::mem::replace(&mut tree[i], Vec::with_capacity(degree)); - let (block, link_info) = TreeNode::Stem(links).encode(&code)?; + let (block, link_info) = TreeNode::Stem(links).encode(code)?; let cid = *block.cid(); yield block; @@ -154,7 +152,7 @@ fn stream_balanced_tree( // since all the stem nodes are able to receive links // we don't have to worry about "overflow" while let Some(links) = tree.pop_front() { - let (block, link_info) = TreeNode::Stem(links).encode(&code)?; + let (block, link_info) = TreeNode::Stem(links).encode(code)?; let cid = *block.cid(); yield block; @@ -215,7 +213,7 @@ pub enum TreeNode { } impl TreeNode { - pub fn encode(self, code: &multihash::Code) -> Result<(Block, LinkInfo)> { + pub fn encode(self, code: multihash::Code) -> Result<(Block, LinkInfo)> { match self { TreeNode::Leaf(bytes) => { let len = bytes.len(); @@ -271,7 +269,7 @@ mod tests { if num_chunks / degree == 0 { let chunk = chunks.next().await.unwrap().unwrap(); let leaf = TreeNode::Leaf(chunk); - let (block, _) = leaf.encode(&multihash::Code::Sha2_256).unwrap(); + let (block, _) = leaf.encode(multihash::Code::Sha2_256).unwrap(); tree[0].push(block); return tree; } @@ -279,7 +277,7 @@ mod tests { while let Some(chunk) = chunks.next().await { let chunk = chunk.unwrap(); let leaf = TreeNode::Leaf(chunk); - let (block, link_info) = leaf.encode(&multihash::Code::Sha2_256).unwrap(); + let (block, link_info) = leaf.encode(multihash::Code::Sha2_256).unwrap(); links[0].push((*block.cid(), link_info)); tree[0].push(block); } @@ -291,7 +289,7 @@ mod tests { let mut links_layer = Vec::with_capacity(count); for links in prev_layer.chunks(degree) { let stem = TreeNode::Stem(links.to_vec()); - let (block, link_info) = stem.encode(&multihash::Code::Sha2_256).unwrap(); + let (block, link_info) = stem.encode(multihash::Code::Sha2_256).unwrap(); links_layer.push((*block.cid(), link_info)); tree_layer.push(block); } @@ -355,13 +353,13 @@ mod tests { fn make_leaf(data: usize) -> (Block, LinkInfo) { TreeNode::Leaf(BytesMut::from(&data.to_be_bytes()[..]).freeze()) - .encode(&multihash::Code::Sha2_256) + .encode(multihash::Code::Sha2_256) .unwrap() } fn make_stem(links: Vec<(Cid, LinkInfo)>) -> (Block, LinkInfo) { TreeNode::Stem(links) - .encode(&multihash::Code::Sha2_256) + .encode(multihash::Code::Sha2_256) .unwrap() } diff --git a/iroh-unixfs/src/builder.rs b/iroh-unixfs/src/builder.rs index f636ea511..486da9295 100644 --- a/iroh-unixfs/src/builder.rs +++ b/iroh-unixfs/src/builder.rs @@ -93,7 +93,7 @@ impl Directory { Directory::single("".into(), Entry::Directory(self)) } - pub async fn encode_root(self, code: &multihash::Code) -> Result { + pub async fn encode_root(self, code: multihash::Code) -> Result { let mut current = None; let parts = self.encode(code); tokio::pin!(parts); @@ -105,7 +105,7 @@ impl Directory { current.expect("must not be empty") } - pub fn encode<'a>(self, code: &multihash::Code) -> BoxStream<'a, Result> { + pub fn encode<'a>(self, code: multihash::Code) -> BoxStream<'a, Result> { match self { Directory::Basic(basic) => basic.encode(code), Directory::Hamt(hamt) => hamt.encode(code), @@ -114,13 +114,12 @@ impl Directory { } impl BasicDirectory { - pub fn encode<'a>(self, code: &multihash::Code) -> BoxStream<'a, Result> { - let code = code.clone(); + pub fn encode<'a>(self, code: multihash::Code) -> BoxStream<'a, Result> { async_stream::try_stream! { let mut links = Vec::new(); for entry in self.entries { let name = entry.name().to_string(); - let parts = entry.encode(&code).await?; + let parts = entry.encode(code).await?; tokio::pin!(parts); let mut root = None; let mut size = 0u64; @@ -145,14 +144,14 @@ impl BasicDirectory { }; let outer = encode_unixfs_pb(&inner, links)?; let node = UnixfsNode::Directory(Node { outer, inner }); - yield node.encode(&code)?; + yield node.encode(code)?; } .boxed() } } impl HamtDirectory { - pub fn encode<'a>(self, code: &multihash::Code) -> BoxStream<'a, Result> { + pub fn encode<'a>(self, code: multihash::Code) -> BoxStream<'a, Result> { self.hamt.encode(code) } } @@ -264,7 +263,7 @@ impl Symlink { &self.name } - pub fn encode(self, code: &multihash::Code) -> Result { + pub fn encode(self, code: multihash::Code) -> Result { let target = self .target .to_str() @@ -463,7 +462,7 @@ impl Entry { } } - pub async fn encode(self, code: &multihash::Code) -> Result>> { + pub async fn encode(self, code: multihash::Code) -> Result>> { Ok(match self { Entry::File(f) => f.encode().await?.boxed(), Entry::Directory(d) => d.encode(code), @@ -595,7 +594,7 @@ impl DirectoryBuilder { pub async fn add_path(self, path: impl Into) -> Result { let chunker = self.chunker.clone(); - let degree = self.degree.clone(); + let degree = self.degree; Ok(self.add_entries( make_entries_from_path(path, chunker, degree) .await? @@ -680,8 +679,7 @@ impl HamtNode { } } - pub fn encode<'a>(self, code: &multihash::Code) -> BoxStream<'a, Result> { - let code = code.clone(); + pub fn encode<'a>(self, code: multihash::Code) -> BoxStream<'a, Result> { match self { Self::Branch(tree) => { async_stream::try_stream! { @@ -690,7 +688,7 @@ impl HamtNode { for (prefix, node) in tree { let name = format!("{:02X}{}", prefix, node.name()); bitfield.set_bit(prefix); - let blocks = node.encode(&code); + let blocks = node.encode(code); let mut root = None; tokio::pin!(blocks); while let Some(block) = blocks.next().await { @@ -715,11 +713,11 @@ impl HamtNode { // it does not really matter what enum variant we choose here as long as // it is not raw. The type of the node will be HamtShard from above. let node = UnixfsNode::Directory(crate::unixfs::Node { outer, inner }); - yield node.encode(&code)?; + yield node.encode(code)?; } .boxed() } - Self::Leaf(HamtLeaf(_hash, entry)) => async move { entry.encode(&code).await } + Self::Leaf(HamtLeaf(_hash, entry)) => async move { entry.encode(code).await } .try_flatten_stream() .boxed(), } @@ -862,12 +860,12 @@ mod tests { let mut baz = SymlinkBuilder::new("baz.txt"); baz.target("bat.txt"); let baz = baz.build().await?; - baz.encode(&multihash::Code::Sha2_256)? + baz.encode(multihash::Code::Sha2_256)? }; let dir = dir.add_file(bar).add_symlink(baz).build()?; - let dir_block = dir.encode_root(&multihash::Code::Sha2_256).await?; + let dir_block = dir.encode_root(multihash::Code::Sha2_256).await?; let decoded_dir = UnixfsNode::decode(dir_block.cid(), dir_block.data().clone())?; let links = decoded_dir.links().collect::>>().unwrap(); @@ -919,12 +917,12 @@ mod tests { let mut baz = SymlinkBuilder::new("baz.txt"); baz.target("bat.txt"); let baz = baz.build().await?; - baz.encode(&multihash::Code::Sha2_256)? + baz.encode(multihash::Code::Sha2_256)? }; let dir = dir.add_file(bar).add_symlink(baz).build()?; - let dir_block = dir.encode_root(&multihash::Code::Sha2_256).await?; + let dir_block = dir.encode_root(multihash::Code::Sha2_256).await?; let decoded_dir = UnixfsNode::decode(dir_block.cid(), dir_block.data().clone())?; let links = decoded_dir.links().collect::>>().unwrap(); @@ -997,7 +995,7 @@ mod tests { let dir = dir.add_file(bar).add_file(baz).build()?; - let dir_block = dir.encode_root(&multihash::Code::Sha2_256).await?; + let dir_block = dir.encode_root(multihash::Code::Sha2_256).await?; let decoded_dir = UnixfsNode::decode(dir_block.cid(), dir_block.data().clone())?; let links = decoded_dir.links().collect::>>().unwrap(); diff --git a/iroh-unixfs/src/unixfs.rs b/iroh-unixfs/src/unixfs.rs index 275802e9c..56db1226b 100644 --- a/iroh-unixfs/src/unixfs.rs +++ b/iroh-unixfs/src/unixfs.rs @@ -179,7 +179,7 @@ impl UnixfsNode { } } - pub fn encode(&self, code: &multihash::Code) -> Result { + pub fn encode(&self, code: multihash::Code) -> Result { let res = match self { UnixfsNode::Raw(data) => { let out = data.clone();