diff --git a/iroh-resolver/src/balanced_tree.rs b/iroh-resolver/src/balanced_tree.rs index 825f491be71..54d8b6f4d74 100644 --- a/iroh-resolver/src/balanced_tree.rs +++ b/iroh-resolver/src/balanced_tree.rs @@ -2,42 +2,23 @@ use std::collections::VecDeque; use anyhow::Result; use async_stream::try_stream; +use bytes::{Bytes, BytesMut}; +use cid::Cid; use futures::{Stream, StreamExt}; -pub enum UnixfsNode { - Raw(usize), - File(Vec), -} - -impl UnixfsNode { - fn cid(&self) -> usize { - match self { - UnixfsNode::Raw(cid) => *cid, - UnixfsNode::File(cids) => cids.iter().sum(), - } - } -} - -impl std::fmt::Display for UnixfsNode { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - UnixfsNode::Raw(cid) => write!(f, "{}", cid), - UnixfsNode::File(cids) => { - write!(f, "{:?} - {}", cids, self.cid()) - } - } - } -} +use crate::unixfs::{dag_pb, unixfs_pb, DataType, Node, UnixfsNode}; +use crate::unixfs_builder::encode_unixfs_pb; // 3 root // 2 nextparent0 // 1 parent0 parent1 ... parent7 // 0 [0, 1, 2, 3, 4, 5, 6, 7] [0, ... ] +// -pub fn build_stream( - in_stream: impl Stream, -) -> impl Stream> { - const MAX_DEGREES: usize = 3; +pub fn stream_balanced_tree( + in_stream: impl Stream>, + max_degrees: usize, +) -> impl Stream> { try_stream! { // vec![ vec![] ] // .. @@ -55,20 +36,22 @@ pub fn build_stream( // vec![ vec![], vec![p0, p1, p2, p3, p4, p5, p6, p7], vec![] ] // vec![ vec![8], vec![p8], vec![pp0] ] - let mut tree: VecDeque> = VecDeque::new(); - tree.push_back(Vec::with_capacity(MAX_DEGREES)); + let mut tree: VecDeque> = VecDeque::new(); + tree.push_back(Vec::with_capacity(max_degrees)); tokio::pin!(in_stream); while let Some(chunk) = in_stream.next().await { + let chunk = chunk?; + let chunk = chunk.freeze(); let tree_len = tree.len(); // check if the leaf node of the tree is full - if tree[0].len() == MAX_DEGREES { + if tree[0].len() == max_degrees { // if so, iterater through nodes for i in 0..tree_len { // if we encounter any nodes that are not full, break - if tree[i].len() < MAX_DEGREES { + if tree[i].len() < max_degrees { break; } @@ -80,30 +63,50 @@ pub fn build_stream( // create node, keeping the cid let links = std::mem::replace(&mut tree[i], Vec::new()); - let node = UnixfsNode::File(links); - let cid = node.cid(); - yield node; + let (cid, bytes) = TreeNode::Stem(links).encode()?; + let len = bytes.len(); + yield (cid, bytes); // add cid to parent node - tree[i+1].push(cid); + tree[i+1].push((cid, len)); } + // at this point the tree will be able to recieve new links + // without "overflowing", aka the leaf node and stem nodes + // have fewer than max_degrees number of links } // now that we know the tree is in a "healthy" state to // recieve more links, add the link to the tree - let raw = UnixfsNode::Raw(chunk); - tree[0].push(raw.cid()); - yield raw; + let (cid, bytes) = TreeNode::Leaf(chunk).encode()?; + let len = bytes.len(); + tree[0].push((cid, len)); + yield (cid, bytes); + // at this point, the leaf node may have max_degrees number of + // links, but no other stem node will + } + + // our stream had 1 chunk that we have already yielded + if tree.len() == 1 && tree[0].len() == 1 { + return } - // yield not filled subtrees + // clean up, aka yield the rest of the stem nodes + // since all the stem nodes are able to recieve links + // we don't have to worry about "overflow" while let Some(links) = tree.pop_front() { - let node = UnixfsNode::File(links); - let cid = node.cid(); - yield node; + // root node + // if tree.len() == 0 { + // let (cid, bytes) = TreeNode::Root(links).encode()?; + // yield (cid, bytes); + // return + // } + + let (cid, bytes) = TreeNode::Stem(links).encode()?; + let len = bytes.len(); + yield (cid, bytes); if let Some(front) = tree.front_mut() { - front.push(cid); + front.push((cid, len)); } else { // final root, nothing to do } @@ -111,19 +114,267 @@ pub fn build_stream( } } +fn create_unixfs_node_from_links(links: Vec<(Cid, usize)>, is_root: bool) -> Result { + let links = links + .into_iter() + .map(|(cid, len)| dag_pb::PbLink { + hash: Some(cid.to_bytes()), + // TODO: notes say that leaf chunks have "", does that mean that + // stem nodes should have None? + name: Some("".into()), + tsize: Some(len as u64), + }) + .collect(); + + // PBNode.Data + let inner = unixfs_pb::Data { + r#type: DataType::File as i32, + ..Default::default() + }; + + // create PBNode + let outer = encode_unixfs_pb(&inner, links)?; + + if is_root { + return Ok(UnixfsNode::Directory(Node { inner, outer })); + } + + // create UnixfsNode + Ok(UnixfsNode::File(Node { inner, outer })) +} + +// Leaf and Stem nodes are the two types of nodes that can exist in the tree +// Leaf nodes encode to `UnixfsNode::Raw` +// Stem nodes encode to `UnixfsNode::File` +// Root nodes encode to `UnixfsNode::Directory` +enum TreeNode { + Leaf(Bytes), + Stem(Vec<(Cid, usize)>), + // Root(Vec<(Cid, usize)>), +} + +impl TreeNode { + fn encode(self) -> Result<(Cid, Bytes)> { + match self { + TreeNode::Leaf(bytes) => { + // TODO: notes say that balanced in go-ipfs marks leaves as `File`, not `Raw`, should + // I change this? + let node = UnixfsNode::Raw(bytes); + node.encode() + } + TreeNode::Stem(cids) => { + let node = create_unixfs_node_from_links(cids, false)?; + node.encode() + } // TreeNode::Root(cids) => { + // let node = create_unixfs_node_from_links(cids, true)?; + // node.encode() + // } + } + } +} + #[cfg(test)] mod tests { use super::*; + fn test_chunk_stream(num_chunks: usize) -> impl Stream> { + async_stream::try_stream! { + for n in 0..num_chunks { + let bytes = BytesMut::from(&n.to_be_bytes()[..]); + yield bytes + } + } + } + + async fn ensure_equal( + expect: Vec<(Cid, Bytes)>, + got: impl Stream>, + ) { + let mut i = 0; + tokio::pin!(got); + while let Some(node) = got.next().await { + let (expect_cid, expect_bytes) = expect + .get(i) + .expect("too many nodes in balanced tree stream"); + let node = node.expect("unexpected error in balanced tree stream"); + let (got_cid, got_bytes) = node; + println!("node index {}", i); + assert_eq!(*expect_cid, got_cid); + assert_eq!(*expect_bytes, got_bytes); + i += 1; + } + if expect.len() != i { + panic!( + "expected at {} nodes of the stream, got {}", + expect.len(), + i + ); + } + } + + #[tokio::test] + async fn balanced_tree_test_leaf() { + let chunk_stream = test_chunk_stream(1); + tokio::pin!(chunk_stream); + + // "manually" build expected stream output + let mut expect = Vec::new(); + let chunk = chunk_stream.next().await.unwrap().unwrap().freeze(); + let node = TreeNode::Leaf(chunk.clone()); + let (cid, bytes) = node.encode().unwrap(); + expect.push((cid, bytes.clone())); + let got = stream_balanced_tree(test_chunk_stream(1), 3); + tokio::pin!(got); + ensure_equal(expect, got).await; + } + + #[tokio::test] + async fn balanced_tree_test_height_one() { + let num_chunks = 3; + let degrees = 3; + let chunk_stream = test_chunk_stream(num_chunks); + tokio::pin!(chunk_stream); + + // "manually" build expected stream output + let mut expect = Vec::new(); + let mut links = Vec::new(); + while let Some(chunk) = chunk_stream.next().await { + let chunk = chunk.unwrap().freeze(); + let node = TreeNode::Leaf(chunk); + let (cid, bytes) = node.encode().unwrap(); + links.push((cid, bytes.len())); + expect.push((cid, bytes)); + } + // let root = TreeNode::Root(links); + let root = TreeNode::Stem(links); + expect.push(root.encode().unwrap()); + + let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + tokio::pin!(got); + ensure_equal(expect, got).await; + } + + #[tokio::test] + async fn balanced_tree_test_height_two_full() { + let degrees = 3; + let num_chunks = 9; + let chunk_stream = test_chunk_stream(num_chunks); + tokio::pin!(chunk_stream); + + // "manually" build expected stream output + let mut expect = Vec::new(); + let mut links_one = Vec::new(); + let mut links_two = Vec::new(); + while let Some(chunk) = chunk_stream.next().await { + let chunk = chunk.unwrap().freeze(); + let node = TreeNode::Leaf(chunk); + let (cid, bytes) = node.encode().unwrap(); + links_one.push((cid, bytes.len())); + expect.push((cid, bytes)); + if links_one.len() == degrees { + let links = std::mem::take(&mut links_one); + let stem = TreeNode::Stem(links); + let (cid, bytes) = stem.encode().unwrap(); + links_two.push((cid, bytes.len())); + expect.push((cid, bytes)); + } + } + // let root = TreeNode::Root(links_two); + let root = TreeNode::Stem(links_two); + expect.push(root.encode().unwrap()); + + let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + tokio::pin!(got); + ensure_equal(expect, got).await; + } + #[tokio::test] - async fn print_test() { - let len = 78; - let v: Vec = (1..=len).collect(); - let stream = futures::stream::iter(v); - let unixfs_node_stream = build_stream(stream); - tokio::pin!(unixfs_node_stream); - while let Some(node) = unixfs_node_stream.next().await { - println!("{}", node.unwrap()); + async fn balanced_tree_test_height_two_not_full() { + let num_chunks = 7; + let degrees = 3; + let chunk_stream = test_chunk_stream(num_chunks); + tokio::pin!(chunk_stream); + + // "manually" build expected stream output + let mut expect = Vec::new(); + let mut links_one = Vec::new(); + let mut links_two = Vec::new(); + while let Some(chunk) = chunk_stream.next().await { + let chunk = chunk.unwrap().freeze(); + let node = TreeNode::Leaf(chunk); + let (cid, bytes) = node.encode().unwrap(); + links_one.push((cid, bytes.len())); + expect.push((cid, bytes)); + if links_one.len() == degrees { + let links = std::mem::take(&mut links_one); + let stem = TreeNode::Stem(links); + let (cid, bytes) = stem.encode().unwrap(); + links_two.push((cid, bytes.len())); + expect.push((cid, bytes)); + } } + let stem = TreeNode::Stem(links_one); + let (cid, bytes) = stem.encode().unwrap(); + links_two.push((cid, bytes.len())); + expect.push((cid, bytes)); + // let root = TreeNode::Root(links_two); + let root = TreeNode::Stem(links_two); + expect.push(root.encode().unwrap()); + + let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + tokio::pin!(got); + ensure_equal(expect, got).await; + } + + #[tokio::test] + async fn balanced_tree_test_height_three() { + let num_chunks = 17; + let degrees = 3; + let chunk_stream = test_chunk_stream(num_chunks); + tokio::pin!(chunk_stream); + + // "manually" build expected stream output + let mut expect = Vec::new(); + let mut links_one = Vec::new(); + let mut links_two = Vec::new(); + let mut links_three = Vec::new(); + while let Some(chunk) = chunk_stream.next().await { + let chunk = chunk.unwrap().freeze(); + let node = TreeNode::Leaf(chunk); + let (cid, bytes) = node.encode().unwrap(); + links_one.push((cid, bytes.len())); + expect.push((cid, bytes)); + if links_one.len() == degrees { + let links = std::mem::take(&mut links_one); + let stem = TreeNode::Stem(links); + let (cid, bytes) = stem.encode().unwrap(); + links_two.push((cid, bytes.len())); + expect.push((cid, bytes)); + if links_two.len() == degrees { + let links = std::mem::take(&mut links_two); + let stem = TreeNode::Stem(links); + let (cid, bytes) = stem.encode().unwrap(); + links_three.push((cid, bytes.len())); + expect.push((cid, bytes)); + } + } + } + let one = TreeNode::Stem(links_one); + let (cid, bytes) = one.encode().unwrap(); + links_two.push((cid, bytes.len())); + expect.push((cid, bytes)); + + let two = TreeNode::Stem(links_two); + let (cid, bytes) = two.encode().unwrap(); + links_three.push((cid, bytes.len())); + expect.push((cid, bytes)); + + // let root = TreeNode::Root(links_three); + let root = TreeNode::Stem(links_three); + expect.push(root.encode().unwrap()); + + let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + tokio::pin!(got); + ensure_equal(expect, got).await; } } diff --git a/iroh-resolver/src/unixfs.rs b/iroh-resolver/src/unixfs.rs index a756d0b6062..63e89ce4b0b 100644 --- a/iroh-resolver/src/unixfs.rs +++ b/iroh-resolver/src/unixfs.rs @@ -116,17 +116,12 @@ pub enum HamtHashFunction { } #[derive(Debug, PartialEq, Clone)] -// Node is a convenient representation of a node in the tree -// the "outer" PbNode actually contains a reference to the "inner" unixfs_pb::Data -// but is represented as "flat" here pub struct Node { pub(super) outer: dag_pb::PbNode, pub(super) inner: unixfs_pb::Data, } impl Node { - // encode the "outer" PbNode, which actually contains a reference to the "inner" unixfs_pb data - // node fn encode(&self) -> Result { let bytes = self.outer.encode_to_vec(); Ok(bytes.into()) @@ -215,14 +210,25 @@ impl UnixfsNode { } } - pub fn encode(&self) -> Result { - let out = match self { - UnixfsNode::Raw(data) => data.clone(), + pub fn encode(&self) -> Result<(Cid, Bytes)> { + let (cid, out) = match self { + UnixfsNode::Raw(data) => { + let out = data.clone(); + let cid = Cid::new_v1(Codec::Raw as _, cid::multihash::Code::Sha2_256.digest(&out)); + (cid, out) + } UnixfsNode::RawNode(node) | UnixfsNode::Directory(node) | UnixfsNode::File(node) | UnixfsNode::Symlink(node) - | UnixfsNode::HamtShard(node, _) => node.encode()?, + | UnixfsNode::HamtShard(node, _) => { + let out = node.encode()?; + let cid = Cid::new_v1( + Codec::DagPb as _, + cid::multihash::Code::Sha2_256.digest(&out), + ); + (cid, out) + } }; ensure!( @@ -231,15 +237,7 @@ impl UnixfsNode { out.len() ); - Ok(out) - } - - pub fn calculate_cid(&self) -> Result { - let bytes = self.encode()?; - Ok(Cid::new_v1( - Codec::DagPb as _, - cid::multihash::Code::Sha2_256.digest(&bytes), - )) + Ok((cid, out)) } pub const fn typ(&self) -> Option { @@ -401,7 +399,7 @@ pub enum UnixfsContentReader { } impl UnixfsContentReader { - /// Returrns the size in bytes, if known in advance. + /// Returns the size in bytes, if known in advance. pub fn size(&self) -> Option { match self { UnixfsContentReader::File { root_node, .. } => { diff --git a/iroh-resolver/src/unixfs_builder.rs b/iroh-resolver/src/unixfs_builder.rs index f2e0cd9f4cc..60fba69c59f 100644 --- a/iroh-resolver/src/unixfs_builder.rs +++ b/iroh-resolver/src/unixfs_builder.rs @@ -2,7 +2,7 @@ use std::{fmt::Debug, path::Path, pin::Pin}; use anyhow::{anyhow, ensure, Result}; use async_trait::async_trait; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use cid::{multihash::MultihashDigest, Cid}; use futures::{Stream, StreamExt}; use iroh_rpc_client::Client; @@ -10,6 +10,7 @@ use prost::Message; use tokio::io::AsyncRead; use crate::{ + balanced_tree::stream_balanced_tree, chunker::{self, Chunker, DEFAULT_CHUNK_SIZE_LIMIT}, codecs::Codec, unixfs::{dag_pb, unixfs_pb, DataType, Node, UnixfsNode}, @@ -99,9 +100,7 @@ impl Directory { let outer = encode_unixfs_pb(&inner, links)?; let node = UnixfsNode::Directory(Node { outer, inner }); - let bytes = node.encode()?; - let cid = Cid::new_v1(Codec::DagPb as _, cid::multihash::Code::Sha2_256.digest(&bytes)); - yield (cid, bytes) + yield node.encode()?; } } } @@ -132,7 +131,7 @@ impl Debug for FileBuilder { /// Representation of a constructed File. pub struct File { name: String, - nodes: Pin>>>, + nodes: Pin>>>, } impl Debug for File { @@ -198,70 +197,16 @@ impl File { } pub fn encode(self) -> impl Stream> { - async_stream::try_stream! { - let mut cids = Vec::new(); - let nodes = self.nodes; - tokio::pin!(nodes); - - while let Some(node) = nodes.next().await { - let node = node?; - let bytes = node.encode()?; - let cid = Cid::new_v1(Codec::Raw as _, cid::multihash::Code::Sha2_256.digest(&bytes)); - cids.push((cid, bytes.len())); - yield (cid, bytes); - } - - if cids.len() > 1 { - // yield root as last element, as we now have all links - let links = cids.into_iter().map(|(cid, len)| { - dag_pb::PbLink { - hash: Some(cid.to_bytes()), - name: Some("".into()), - tsize: Some(len as u64), - } - }).collect(); - - let inner = unixfs_pb::Data { - r#type: DataType::File as i32, - ..Default::default() - }; - let outer = encode_unixfs_pb(&inner, links)?; - let node = UnixfsNode::Directory(Node { outer, inner }); - let bytes = node.encode()?; - let cid = Cid::new_v1(Codec::DagPb as _, cid::multihash::Code::Sha2_256.digest(&bytes)); - yield (cid, bytes) - } - } + // "The default max width is 174": https://github.com/ipfs/specs/blob/main/UNIXFS.md#layout + const MAX_DEGREES: usize = 174; + stream_balanced_tree(self.nodes, MAX_DEGREES) } } -// fn create_link_node(cids: Vec<(Cid, usize)>) -> Result { -// let links = cids -// .into_iter() -// .map(|(cid, len)| dag_pb::PbLink { -// hash: Some(cid.to_bytes()), -// name: Some("".into()), -// tsize: Some(len as u64), -// }) -// .collect(); - -// // PBNode.Data -// let data = unixfs_pb::Data { -// r#type: DataType::File as i32, -// ..Default::default() -// }; - -// // create PBNode -// let pb_node = encode_unixfs_pb(&data, links)?; - -// // create UnixfsNode -// Ok(UnixfsNode::File(Node { -// inner: data, -// outer: pb_node, -// })) -// } - -fn encode_unixfs_pb(inner: &unixfs_pb::Data, links: Vec) -> Result { +pub(crate) fn encode_unixfs_pb( + inner: &unixfs_pb::Data, + links: Vec, +) -> Result { let data = inner.encode_to_vec(); ensure!( data.len() <= DEFAULT_CHUNK_SIZE_LIMIT, @@ -313,17 +258,9 @@ impl FileBuilder { let name = self.name.ok_or_else(|| anyhow!("missing name"))?; let reader = self.content.ok_or_else(|| anyhow!("missing content"))?; - let nodes = self.chunker.chunks(reader).map(|chunk| match chunk { - Ok(chunk) => { - let data = chunk.freeze(); - Ok(UnixfsNode::Raw(data)) - } - Err(err) => Err(err.into()), - }); - Ok(File { name, - nodes: Box::pin(nodes), + nodes: Box::pin(self.chunker.chunks(reader)), }) } } diff --git a/iroh-share/src/lib.rs b/iroh-share/src/lib.rs index e82b33adefc..86365aef1ec 100644 --- a/iroh-share/src/lib.rs +++ b/iroh-share/src/lib.rs @@ -101,6 +101,7 @@ mod tests { // Check progress { + println!("waiting for progress"); let progress = receiver_transfer.progress()?; let progress: Vec<_> = progress.try_collect().await.unwrap(); assert_eq!(progress.len(), 22);