Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use Bytes apart of unixfs operations #131

Merged
merged 6 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# 0.10.0
- refactor: Use `Bytes` apart of unixfs operations. [PR XXX](https://github.com/dariusc93/rust-ipfs/pull/XXX)
dariusc93 marked this conversation as resolved.
Show resolved Hide resolved
- refactor: Remove option and use configuration directly. [PR 129](https://github.com/dariusc93/rust-ipfs/pull/129)
- refactor: Remove option around chunker and use conversion for option arguments.
- chore: Switch from libp2p-nat to libp2p-upnp. [PR 128](https://github.com/dariusc93/rust-ipfs/pull/128)
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ chrono = { version = "0.4" }
libp2p-relay-manager = { version = "0.2.1", path = "packages/libp2p-relay-manager" }
redb = "1.3"
futures-timer = "3.0"
bytes = "1"

[dependencies]
anyhow = "1.0"
Expand All @@ -43,7 +44,7 @@ base64 = { default-features = false, features = ["alloc"], version = "0.21" }
beetle-bitswap-next = { workspace = true, optional = true }
libp2p-bitswap-next = { workspace = true, optional = true }
byteorder = { default-features = false, version = "1" }
bytes = { default-features = false, version = "1" }
bytes = { workspace = true }
libipld.workspace = true
hickory-resolver = "0.24.0"
either = { version = "1" }
Expand Down
2 changes: 1 addition & 1 deletion examples/unixfs-add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn main() -> anyhow::Result<()> {
.start()
.await?;

let mut stream = ipfs.add_file_unixfs(opt.file);
let mut stream = ipfs.add_unixfs(opt.file);

while let Some(status) = stream.next().await {
match status {
Expand Down
13 changes: 8 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use repo::{BlockStore, DataStore, GCConfig, GCTrigger, Lock, RepoInsertPin, Repo
use tokio::task::JoinHandle;
use tracing::Span;
use tracing_futures::Instrument;
use unixfs::{IpfsUnixfs, UnixfsAdd, UnixfsCat, UnixfsGet, UnixfsLs};
use unixfs::{AddOpt, IpfsUnixfs, UnixfsAdd, UnixfsCat, UnixfsGet, UnixfsLs};

use std::{
collections::{BTreeSet, HashMap, HashSet},
Expand Down Expand Up @@ -1264,16 +1264,19 @@ impl Ipfs {
/// Add a file from a path to the blockstore
///
/// To create an owned version of the stream, please use `ipfs::unixfs::add_file` directly.
#[deprecated(note = "Use `Ipfs::add_unixfs` instead")]
pub fn add_file_unixfs<P: AsRef<std::path::Path>>(&self, path: P) -> UnixfsAdd<'_> {
let path = path.as_ref();
self.unixfs().add(path, Default::default()).span(self.span.clone())
let path = path.as_ref().to_path_buf();
self.add_unixfs(path)
}

/// Add a file through a stream of data to the blockstore
///
/// To create an owned version of the stream, please use `ipfs::unixfs::add` directly.
pub fn add_unixfs<'a>(&self, stream: BoxStream<'a, std::io::Result<Vec<u8>>>) -> UnixfsAdd<'a> {
self.unixfs().add(stream, Default::default()).span(self.span.clone())
pub fn add_unixfs<'a>(&self, opt: impl Into<AddOpt<'a>>) -> UnixfsAdd<'a> {
self.unixfs()
.add(opt, Default::default())
.span(self.span.clone())
}

/// Retreive a file and saving it to a path.
Expand Down
5 changes: 3 additions & 2 deletions src/unixfs/add.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::path::{Path, PathBuf};

use crate::{repo::Repo, Block};
use bytes::Bytes;
use either::Either;
use futures::{future::BoxFuture, stream::BoxStream, FutureExt, Stream, StreamExt, TryFutureExt};
use rust_unixfs::file::adder::{Chunker, FileAdderBuilder};
Expand All @@ -24,7 +25,7 @@ pub enum AddOpt<'a> {
Stream {
name: Option<String>,
total: Option<usize>,
stream: BoxStream<'a, std::result::Result<Vec<u8>, std::io::Error>>,
stream: BoxStream<'a, std::result::Result<Bytes, std::io::Error>>,
},
}

Expand Down Expand Up @@ -87,7 +88,7 @@ pub fn add<'a>(which: Either<&Ipfs, &Repo>, options: AddOpt<'a>, opt: AddOption)
.and_then(|file| async move {
let size = file.metadata().await?.len() as usize;

let stream = ReaderStream::new(file).map(|x| x.map(|x| x.into()));
let stream = ReaderStream::new(file);

let name: Option<String> = path.file_name().map(|f| f.to_string_lossy().to_string());

Expand Down
13 changes: 7 additions & 6 deletions src/unixfs/cat.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{dag::IpldDag, repo::Repo, Block, Ipfs};
use async_stream::stream;
use bytes::Bytes;
use either::Either;
use futures::future::BoxFuture;
use futures::stream::{BoxStream, Stream};
Expand Down Expand Up @@ -74,7 +75,7 @@ pub fn cat<'a>(
let (visit, bytes) = match visit.start(block.data()) {
Ok((bytes, _, _, visit)) => {
let bytes = if !bytes.is_empty() {
Some(bytes.to_vec())
Some(Bytes::copy_from_slice(bytes))
} else {
None
};
Expand Down Expand Up @@ -117,7 +118,7 @@ pub fn cat<'a>(
Ok((bytes, next_visit)) => {
if !bytes.is_empty() {
// TODO: manual implementation could allow returning just the slice
yield Ok(bytes.to_vec());
yield Ok(Bytes::copy_from_slice(bytes));
}

match next_visit {
Expand Down Expand Up @@ -159,7 +160,7 @@ impl From<Block> for StartingPoint {
}

pub struct UnixfsCat<'a> {
stream: BoxStream<'a, Result<Vec<u8>, TraversalFailed>>,
stream: BoxStream<'a, Result<Bytes, TraversalFailed>>,
span: Option<Span>,
}

Expand All @@ -171,7 +172,7 @@ impl<'a> UnixfsCat<'a> {
}

impl<'a> Stream for UnixfsCat<'a> {
type Item = Result<Vec<u8>, TraversalFailed>;
type Item = Result<Bytes, TraversalFailed>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
Expand All @@ -181,7 +182,7 @@ impl<'a> Stream for UnixfsCat<'a> {
}

impl<'a> std::future::IntoFuture for UnixfsCat<'a> {
type Output = Result<Vec<u8>, TraversalFailed>;
type Output = Result<Bytes, TraversalFailed>;

type IntoFuture = BoxFuture<'a, Self::Output>;

Expand All @@ -192,7 +193,7 @@ impl<'a> std::future::IntoFuture for UnixfsCat<'a> {
while let Some(bytes) = self.stream.try_next().await? {
data.extend(bytes);
}
Ok(data)
Ok(data.into())
}
.instrument(span)
.boxed()
Expand Down
59 changes: 50 additions & 9 deletions src/unixfs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
use std::{ops::Range, path::PathBuf, time::Duration};

use anyhow::Error;
use bytes::Bytes;
use either::Either;
use futures::stream::BoxStream;
use futures::{
stream::{self, BoxStream},
StreamExt,
};
use libipld::Cid;
use libp2p::PeerId;
use ll::file::FileReadFailed;
Expand All @@ -33,8 +37,8 @@ pub struct IpfsUnixfs {

pub enum AddOpt<'a> {
Path(PathBuf),
Stream(BoxStream<'a, std::io::Result<Vec<u8>>>),
StreamWithName(String, BoxStream<'a, std::io::Result<Vec<u8>>>),
Stream(BoxStream<'a, std::io::Result<Bytes>>),
StreamWithName(String, BoxStream<'a, std::io::Result<Bytes>>),
}

impl<'a> From<&'a str> for AddOpt<'a> {
Expand All @@ -61,14 +65,55 @@ impl From<PathBuf> for AddOpt<'_> {
}
}

impl From<Vec<u8>> for AddOpt<'_> {
fn from(bytes: Vec<u8>) -> Self {
let bytes: Bytes = bytes.into();
Self::from(bytes)
}
}

impl From<(String, Vec<u8>)> for AddOpt<'_> {
fn from((name, bytes): (String, Vec<u8>)) -> Self {
let bytes: Bytes = bytes.into();
Self::from((name, bytes))
}
}

impl From<Bytes> for AddOpt<'_> {
fn from(bytes: Bytes) -> Self {
let stream = stream::once(async { Ok::<_, std::io::Error>(bytes) }).boxed();
AddOpt::Stream(stream)
}
}

impl From<(String, Bytes)> for AddOpt<'_> {
fn from((name, bytes): (String, Bytes)) -> Self {
let stream = stream::once(async { Ok::<_, std::io::Error>(bytes) }).boxed();
Self::from((name, stream))
}
}

impl<'a> From<BoxStream<'a, std::io::Result<Bytes>>> for AddOpt<'a> {
fn from(stream: BoxStream<'a, std::io::Result<Bytes>>) -> Self {
AddOpt::Stream(stream)
}
}

impl<'a> From<(String, BoxStream<'a, std::io::Result<Bytes>>)> for AddOpt<'a> {
fn from((name, stream): (String, BoxStream<'a, std::io::Result<Bytes>>)) -> Self {
AddOpt::StreamWithName(name, stream)
}
}

impl<'a> From<BoxStream<'a, std::io::Result<Vec<u8>>>> for AddOpt<'a> {
fn from(stream: BoxStream<'a, std::io::Result<Vec<u8>>>) -> Self {
AddOpt::Stream(stream)
AddOpt::Stream(stream.map(|result| result.map(|data| data.into())).boxed())
}
}

impl<'a> From<(String, BoxStream<'a, std::io::Result<Vec<u8>>>)> for AddOpt<'a> {
fn from((name, stream): (String, BoxStream<'a, std::io::Result<Vec<u8>>>)) -> Self {
let stream = stream.map(|result| result.map(|data| data.into())).boxed();
AddOpt::StreamWithName(name, stream)
}
}
Expand Down Expand Up @@ -106,11 +151,7 @@ impl IpfsUnixfs {
/// Add a file from either a file or stream
///
/// To create an owned version of the stream, please use `ipfs::unixfs::add` or `ipfs::unixfs::add_file` directly.
pub fn add<'a, I: Into<AddOpt<'a>>>(
&self,
item: I,
option: AddOption,
) -> UnixfsAdd<'a> {
pub fn add<'a, I: Into<AddOpt<'a>>>(&self, item: I, option: AddOption) -> UnixfsAdd<'a> {
let item = item.into();
match item {
AddOpt::Path(path) => add_file(Either::Left(&self.ipfs), path, option),
Expand Down