From 347d45c3de3bcba878657566a67f4e1825b03bc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Fri, 2 Aug 2024 09:07:00 +0300 Subject: [PATCH] refactor(iroh): remove flume from iroh-cli and iroh (#2543) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Description refactor(iroh): remove flume from iroh-cli and iroh Removes most usages of flume from iroh and all usages of flume from iroh-cli. ## Breaking Changes None ## Notes & open questions Note: this does not remove all usages because we need the iroh docs purge PR to be merged before that. ## Change checklist - [ ] Self-review. - [ ] ~~Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant.~~ - [ ] ~~Tests if relevant.~~ - [ ] ~~All breaking changes documented.~~ --------- Co-authored-by: Philipp Krüger Co-authored-by: Kasey Co-authored-by: Kasey Huizinga --- Cargo.lock | 2 +- iroh-blobs/src/downloader/progress.rs | 4 +- iroh-blobs/src/downloader/test.rs | 26 +++--- iroh-blobs/src/util/progress.rs | 128 ++++++++++++++++++++++++++ iroh-cli/Cargo.toml | 2 +- iroh-cli/src/commands/doctor.rs | 14 +-- iroh/src/node/rpc.rs | 99 ++++++++++---------- iroh/tests/gc.rs | 23 +++-- 8 files changed, 213 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a7afd8edd7..4b06c11628 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2629,6 +2629,7 @@ name = "iroh-cli" version = "0.21.0" dependencies = [ "anyhow", + "async-channel", "bao-tree", "bytes", "clap", @@ -2640,7 +2641,6 @@ dependencies = [ "dialoguer", "dirs-next", "duct", - "flume", "futures-buffered", "futures-lite 2.3.0", "futures-util", diff --git a/iroh-blobs/src/downloader/progress.rs b/iroh-blobs/src/downloader/progress.rs index 8a0114dda2..eac80985d5 100644 --- a/iroh-blobs/src/downloader/progress.rs +++ b/iroh-blobs/src/downloader/progress.rs @@ -11,13 +11,13 @@ use parking_lot::Mutex; use crate::{ get::{db::DownloadProgress, progress::TransferState}, - util::progress::{FlumeProgressSender, IdGenerator, ProgressSendError, ProgressSender}, + util::progress::{AsyncChannelProgressSender, IdGenerator, ProgressSendError, ProgressSender}, }; use super::DownloadKind; /// The channel that can be used to subscribe to progress updates. -pub type ProgressSubscriber = FlumeProgressSender; +pub type ProgressSubscriber = AsyncChannelProgressSender; /// Track the progress of downloads. /// diff --git a/iroh-blobs/src/downloader/test.rs b/iroh-blobs/src/downloader/test.rs index 871b835ba7..2e734eaf3b 100644 --- a/iroh-blobs/src/downloader/test.rs +++ b/iroh-blobs/src/downloader/test.rs @@ -12,7 +12,7 @@ use crate::{ get::{db::BlobId, progress::TransferState}, util::{ local_pool::LocalPool, - progress::{FlumeProgressSender, IdGenerator}, + progress::{AsyncChannelProgressSender, IdGenerator}, }, }; @@ -276,13 +276,13 @@ async fn concurrent_progress() { let hash = Hash::new([0u8; 32]); let kind_1 = HashAndFormat::raw(hash); - let (prog_a_tx, prog_a_rx) = flume::bounded(64); - let prog_a_tx = FlumeProgressSender::new(prog_a_tx); + let (prog_a_tx, prog_a_rx) = async_channel::bounded(64); + let prog_a_tx = AsyncChannelProgressSender::new(prog_a_tx); let req = DownloadRequest::new(kind_1, vec![peer]).progress_sender(prog_a_tx); let handle_a = downloader.queue(req).await; - let (prog_b_tx, prog_b_rx) = flume::bounded(64); - let prog_b_tx = FlumeProgressSender::new(prog_b_tx); + let (prog_b_tx, prog_b_rx) = async_channel::bounded(64); + let prog_b_tx = AsyncChannelProgressSender::new(prog_b_tx); let req = DownloadRequest::new(kind_1, vec![peer]).progress_sender(prog_b_tx); let handle_b = downloader.queue(req).await; @@ -292,8 +292,8 @@ async fn concurrent_progress() { let mut state_b = TransferState::new(hash); let mut state_c = TransferState::new(hash); - let prog1_a = prog_a_rx.recv_async().await.unwrap(); - let prog1_b = prog_b_rx.recv_async().await.unwrap(); + let prog1_a = prog_a_rx.recv().await.unwrap(); + let prog1_b = prog_b_rx.recv().await.unwrap(); assert!(matches!(prog1_a, DownloadProgress::Found { hash, size: 100, ..} if hash == hash)); assert!(matches!(prog1_b, DownloadProgress::Found { hash, size: 100, ..} if hash == hash)); @@ -301,12 +301,12 @@ async fn concurrent_progress() { state_b.on_progress(prog1_b); assert_eq!(state_a, state_b); - let (prog_c_tx, prog_c_rx) = flume::bounded(64); - let prog_c_tx = FlumeProgressSender::new(prog_c_tx); + let (prog_c_tx, prog_c_rx) = async_channel::bounded(64); + let prog_c_tx = AsyncChannelProgressSender::new(prog_c_tx); let req = DownloadRequest::new(kind_1, vec![peer]).progress_sender(prog_c_tx); let handle_c = downloader.queue(req).await; - let prog1_c = prog_c_rx.recv_async().await.unwrap(); + let prog1_c = prog_c_rx.recv().await.unwrap(); assert!(matches!(&prog1_c, DownloadProgress::InitialState(state) if state == &state_a)); state_c.on_progress(prog1_c); @@ -317,9 +317,9 @@ async fn concurrent_progress() { res_b.unwrap(); res_c.unwrap(); - let prog_a: Vec<_> = prog_a_rx.into_stream().collect().await; - let prog_b: Vec<_> = prog_b_rx.into_stream().collect().await; - let prog_c: Vec<_> = prog_c_rx.into_stream().collect().await; + let prog_a: Vec<_> = prog_a_rx.collect().await; + let prog_b: Vec<_> = prog_b_rx.collect().await; + let prog_c: Vec<_> = prog_c_rx.collect().await; assert_eq!(prog_a.len(), 1); assert_eq!(prog_b.len(), 1); diff --git a/iroh-blobs/src/util/progress.rs b/iroh-blobs/src/util/progress.rs index 8915b1cfb2..6f1f678655 100644 --- a/iroh-blobs/src/util/progress.rs +++ b/iroh-blobs/src/util/progress.rs @@ -518,6 +518,98 @@ impl ProgressSender for FlumeProgressSender { } } +/// A progress sender that uses an async channel. +pub struct AsyncChannelProgressSender { + sender: async_channel::Sender, + id: std::sync::Arc, +} + +impl std::fmt::Debug for AsyncChannelProgressSender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AsyncChannelProgressSender") + .field("id", &self.id) + .field("sender", &self.sender) + .finish() + } +} + +impl Clone for AsyncChannelProgressSender { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + id: self.id.clone(), + } + } +} + +impl AsyncChannelProgressSender { + /// Create a new progress sender from an async channel sender. + pub fn new(sender: async_channel::Sender) -> Self { + Self { + sender, + id: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)), + } + } + + /// Returns true if `other` sends on the same `async_channel` channel as `self`. + pub fn same_channel(&self, other: &AsyncChannelProgressSender) -> bool { + same_channel(&self.sender, &other.sender) + } +} + +/// Given a value that is aligned and sized like a pointer, return the value of +/// the pointer as a usize. +fn get_as_ptr(value: &T) -> Option { + use std::mem; + if mem::size_of::() == std::mem::size_of::() + && mem::align_of::() == mem::align_of::() + { + // SAFETY: size and alignment requirements are checked and met + unsafe { Some(mem::transmute_copy(value)) } + } else { + None + } +} + +fn same_channel(a: &async_channel::Sender, b: &async_channel::Sender) -> bool { + // This relies on async_channel::Sender being just a newtype wrapper around + // an Arc>, so if two senders point to the same channel, the + // pointers will be the same. + get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap() +} + +impl IdGenerator for AsyncChannelProgressSender { + fn new_id(&self) -> u64 { + self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } +} + +impl ProgressSender for AsyncChannelProgressSender { + type Msg = T; + + async fn send(&self, msg: Self::Msg) -> std::result::Result<(), ProgressSendError> { + self.sender + .send(msg) + .await + .map_err(|_| ProgressSendError::ReceiverDropped) + } + + fn try_send(&self, msg: Self::Msg) -> std::result::Result<(), ProgressSendError> { + match self.sender.try_send(msg) { + Ok(_) => Ok(()), + Err(async_channel::TrySendError::Full(_)) => Ok(()), + Err(async_channel::TrySendError::Closed(_)) => Err(ProgressSendError::ReceiverDropped), + } + } + + fn blocking_send(&self, msg: Self::Msg) -> std::result::Result<(), ProgressSendError> { + match self.sender.send_blocking(msg) { + Ok(_) => Ok(()), + Err(_) => Err(ProgressSendError::ReceiverDropped), + } + } +} + /// An error that can occur when sending progress messages. /// /// Really the only error that can occur is if the receiver is dropped. @@ -628,3 +720,39 @@ impl io::Result<()> + 'stati self.0.set_len(size).await } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + + #[test] + fn get_as_ptr_works() { + struct Wrapper(Arc); + let x = Wrapper(Arc::new(1u64)); + assert_eq!( + get_as_ptr(&x).unwrap(), + Arc::as_ptr(&x.0) as usize - 2 * std::mem::size_of::() + ); + } + + #[test] + fn get_as_ptr_wrong_use() { + struct Wrapper(#[allow(dead_code)] u8); + let x = Wrapper(1); + assert!(get_as_ptr(&x).is_none()); + } + + #[test] + fn test_sender_is_ptr() { + assert_eq!( + std::mem::size_of::(), + std::mem::size_of::>() + ); + assert_eq!( + std::mem::align_of::(), + std::mem::align_of::>() + ); + } +} diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index 6d4f0e5cbc..d4bd389a4c 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -23,6 +23,7 @@ doc = false [dependencies] anyhow = "1.0.81" +async-channel = "2.3.1" bao-tree = "0.13" bytes = "1.5.0" clap = { version = "4", features = ["derive"] } @@ -33,7 +34,6 @@ crossterm = "0.27.0" derive_more = { version = "1.0.0-beta.1", features = ["display"] } dialoguer = { version = "0.11.0", default-features = false } dirs-next = "2.0.0" -flume = "0.11.0" futures-buffered = "0.2.4" futures-lite = "2.3" futures-util = { version = "0.3.30", features = ["futures-sink"] } diff --git a/iroh-cli/src/commands/doctor.rs b/iroh-cli/src/commands/doctor.rs index 93e3221b6d..6f98302a52 100644 --- a/iroh-cli/src/commands/doctor.rs +++ b/iroh-cli/src/commands/doctor.rs @@ -23,7 +23,7 @@ use iroh::{ base::ticket::{BlobTicket, Ticket}, blobs::{ store::{ReadableStore, Store as _}, - util::progress::{FlumeProgressSender, ProgressSender}, + util::progress::{AsyncChannelProgressSender, ProgressSender}, }, docs::{Capability, DocTicket}, net::{ @@ -1145,28 +1145,28 @@ pub async fn run(command: Commands, config: &NodeConfig) -> anyhow::Result<()> { Commands::TicketInspect { ticket, zbase32 } => inspect_ticket(&ticket, zbase32), Commands::BlobConsistencyCheck { path, repair } => { let blob_store = iroh::blobs::store::fs::Store::load(path).await?; - let (send, recv) = flume::bounded(1); + let (send, recv) = async_channel::bounded(1); let task = tokio::spawn(async move { - while let Ok(msg) = recv.recv_async().await { + while let Ok(msg) = recv.recv().await { println!("{:?}", msg); } }); blob_store - .consistency_check(repair, FlumeProgressSender::new(send).boxed()) + .consistency_check(repair, AsyncChannelProgressSender::new(send).boxed()) .await?; task.await?; Ok(()) } Commands::BlobValidate { path, repair } => { let blob_store = iroh::blobs::store::fs::Store::load(path).await?; - let (send, recv) = flume::bounded(1); + let (send, recv) = async_channel::bounded(1); let task = tokio::spawn(async move { - while let Ok(msg) = recv.recv_async().await { + while let Ok(msg) = recv.recv().await { println!("{:?}", msg); } }); blob_store - .validate(repair, FlumeProgressSender::new(send).boxed()) + .validate(repair, AsyncChannelProgressSender::new(send).boxed()) .await?; task.await?; Ok(()) diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 0796a0d86e..467e91d402 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -15,13 +15,12 @@ use iroh_blobs::get::db::DownloadProgress; use iroh_blobs::get::Stats; use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress, MapEntry}; use iroh_blobs::util::local_pool::LocalPoolHandle; -use iroh_blobs::util::progress::ProgressSender; +use iroh_blobs::util::progress::{AsyncChannelProgressSender, ProgressSender}; use iroh_blobs::util::SetTagOption; use iroh_blobs::BlobFormat; use iroh_blobs::{ provider::AddProgress, store::{Store as BaoStore, ValidateProgress}, - util::progress::FlumeProgressSender, HashAndFormat, }; use iroh_io::AsyncSliceReader; @@ -527,18 +526,18 @@ impl Handler { self, msg: ValidateRequest, ) -> impl Stream + Send + 'static { - let (tx, rx) = flume::bounded(1); + let (tx, rx) = async_channel::bounded(1); let tx2 = tx.clone(); let db = self.inner.db.clone(); tokio::task::spawn(async move { if let Err(e) = db - .validate(msg.repair, FlumeProgressSender::new(tx).boxed()) + .validate(msg.repair, AsyncChannelProgressSender::new(tx).boxed()) .await { - tx2.send_async(ValidateProgress::Abort(e.into())).await.ok(); + tx2.send(ValidateProgress::Abort(e.into())).await.ok(); } }); - rx.into_stream() + rx } /// Invoke validate on the database and stream out the result @@ -546,59 +545,59 @@ impl Handler { self, msg: ConsistencyCheckRequest, ) -> impl Stream + Send + 'static { - let (tx, rx) = flume::bounded(1); + let (tx, rx) = async_channel::bounded(1); let tx2 = tx.clone(); let db = self.inner.db.clone(); tokio::task::spawn(async move { if let Err(e) = db - .consistency_check(msg.repair, FlumeProgressSender::new(tx).boxed()) + .consistency_check(msg.repair, AsyncChannelProgressSender::new(tx).boxed()) .await { - tx2.send_async(ConsistencyCheckProgress::Abort(e.into())) + tx2.send(ConsistencyCheckProgress::Abort(e.into())) .await .ok(); } }); - rx.into_stream() + rx } fn blob_add_from_path(self, msg: AddPathRequest) -> impl Stream { // provide a little buffer so that we don't slow down the sender - let (tx, rx) = flume::bounded(32); + let (tx, rx) = async_channel::bounded(32); let tx2 = tx.clone(); self.local_pool_handle().spawn_detached(|| async move { if let Err(e) = self.blob_add_from_path0(msg, tx).await { - tx2.send_async(AddProgress::Abort(e.into())).await.ok(); + tx2.send(AddProgress::Abort(e.into())).await.ok(); } }); - rx.into_stream().map(AddPathResponse) + rx.map(AddPathResponse) } fn doc_import_file(self, msg: ImportFileRequest) -> impl Stream { // provide a little buffer so that we don't slow down the sender - let (tx, rx) = flume::bounded(32); + let (tx, rx) = async_channel::bounded(32); let tx2 = tx.clone(); self.local_pool_handle().spawn_detached(|| async move { if let Err(e) = self.doc_import_file0(msg, tx).await { - tx2.send_async(crate::client::docs::ImportProgress::Abort(e.into())) + tx2.send(crate::client::docs::ImportProgress::Abort(e.into())) .await .ok(); } }); - rx.into_stream().map(ImportFileResponse) + rx.map(ImportFileResponse) } async fn doc_import_file0( self, msg: ImportFileRequest, - progress: flume::Sender, + progress: async_channel::Sender, ) -> anyhow::Result<()> { let docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; use crate::client::docs::ImportProgress as DocImportProgress; use iroh_blobs::store::ImportMode; use std::collections::BTreeMap; - let progress = FlumeProgressSender::new(progress); + let progress = AsyncChannelProgressSender::new(progress); let names = Arc::new(Mutex::new(BTreeMap::new())); // convert import progress to provide progress let import_progress = progress.clone().with_filter_map(move |x| match x { @@ -660,23 +659,23 @@ impl Handler { } fn doc_export_file(self, msg: ExportFileRequest) -> impl Stream { - let (tx, rx) = flume::bounded(1024); + let (tx, rx) = async_channel::bounded(1024); let tx2 = tx.clone(); self.local_pool_handle().spawn_detached(|| async move { if let Err(e) = self.doc_export_file0(msg, tx).await { - tx2.send_async(ExportProgress::Abort(e.into())).await.ok(); + tx2.send(ExportProgress::Abort(e.into())).await.ok(); } }); - rx.into_stream().map(ExportFileResponse) + rx.map(ExportFileResponse) } async fn doc_export_file0( self, msg: ExportFileRequest, - progress: flume::Sender, + progress: async_channel::Sender, ) -> anyhow::Result<()> { let _docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; - let progress = FlumeProgressSender::new(progress); + let progress = AsyncChannelProgressSender::new(progress); let ExportFileRequest { entry, path, mode } = msg; let key = bytes::Bytes::from(entry.key().to_vec()); let export_progress = progress.clone().with_map(move |mut x| { @@ -700,11 +699,11 @@ impl Handler { } fn blob_download(self, msg: BlobDownloadRequest) -> impl Stream { - let (sender, receiver) = flume::bounded(1024); + let (sender, receiver) = async_channel::bounded(1024); let db = self.inner.db.clone(); let downloader = self.inner.downloader.clone(); let endpoint = self.inner.endpoint.clone(); - let progress = FlumeProgressSender::new(sender); + let progress = AsyncChannelProgressSender::new(sender); self.local_pool_handle().spawn_detached(move || async move { if let Err(err) = download(&db, endpoint, &downloader, msg, progress.clone()).await { progress @@ -714,12 +713,12 @@ impl Handler { } }); - receiver.into_stream().map(DownloadResponse) + receiver.map(DownloadResponse) } fn blob_export(self, msg: ExportRequest) -> impl Stream { - let (tx, rx) = flume::bounded(1024); - let progress = FlumeProgressSender::new(tx); + let (tx, rx) = async_channel::bounded(1024); + let progress = AsyncChannelProgressSender::new(tx); self.local_pool_handle().spawn_detached(move || async move { let res = iroh_blobs::export::export( &self.inner.db, @@ -735,18 +734,18 @@ impl Handler { Err(err) => progress.send(ExportProgress::Abort(err.into())).await.ok(), }; }); - rx.into_stream().map(ExportResponse) + rx.map(ExportResponse) } async fn blob_add_from_path0( self, msg: AddPathRequest, - progress: flume::Sender, + progress: async_channel::Sender, ) -> anyhow::Result<()> { use iroh_blobs::store::ImportMode; use std::collections::BTreeMap; - let progress = FlumeProgressSender::new(progress); + let progress = AsyncChannelProgressSender::new(progress); let names = Arc::new(Mutex::new(BTreeMap::new())); // convert import progress to provide progress let import_progress = progress.clone().with_filter_map(move |x| match x { @@ -923,25 +922,25 @@ impl Handler { msg: AddStreamRequest, stream: impl Stream + Send + Unpin + 'static, ) -> impl Stream { - let (tx, rx) = flume::bounded(32); + let (tx, rx) = async_channel::bounded(32); let this = self.clone(); self.local_pool_handle().spawn_detached(|| async move { if let Err(err) = this.blob_add_stream0(msg, stream, tx.clone()).await { - tx.send_async(AddProgress::Abort(err.into())).await.ok(); + tx.send(AddProgress::Abort(err.into())).await.ok(); } }); - rx.into_stream().map(AddStreamResponse) + rx.map(AddStreamResponse) } async fn blob_add_stream0( self, msg: AddStreamRequest, stream: impl Stream + Send + Unpin + 'static, - progress: flume::Sender, + progress: async_channel::Sender, ) -> anyhow::Result<()> { - let progress = FlumeProgressSender::new(progress); + let progress = AsyncChannelProgressSender::new(progress); let stream = stream.map(|item| match item { AddStreamUpdate::Chunk(chunk) => Ok(chunk), @@ -993,24 +992,24 @@ impl Handler { self, req: ReadAtRequest, ) -> impl Stream> + Send + 'static { - let (tx, rx) = flume::bounded(RPC_BLOB_GET_CHANNEL_CAP); + let (tx, rx) = async_channel::bounded(RPC_BLOB_GET_CHANNEL_CAP); let db = self.inner.db.clone(); self.local_pool_handle().spawn_detached(move || async move { if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await { - tx.send_async(RpcResult::Err(err.into())).await.ok(); + tx.send(RpcResult::Err(err.into())).await.ok(); } }); async fn read_loop( req: ReadAtRequest, db: D, - tx: flume::Sender>, + tx: async_channel::Sender>, max_chunk_size: usize, ) -> anyhow::Result<()> { let entry = db.get(&req.hash).await?; let entry = entry.ok_or_else(|| anyhow!("Blob not found"))?; let size = entry.size(); - tx.send_async(Ok(ReadAtResponse::Entry { + tx.send(Ok(ReadAtResponse::Entry { size, is_complete: entry.is_complete(), })) @@ -1037,7 +1036,7 @@ impl Handler { let chunk = reader.read_at(req.offset + read, chunk_size).await?; let chunk_len = chunk.len(); if !chunk.is_empty() { - tx.send_async(Ok(ReadAtResponse::Data { chunk })).await?; + tx.send(Ok(ReadAtResponse::Data { chunk })).await?; } if chunk_len < chunk_size { break; @@ -1048,7 +1047,7 @@ impl Handler { Ok(()) } - rx.into_stream() + rx } fn node_connections( @@ -1056,17 +1055,15 @@ impl Handler { _: ConnectionsRequest, ) -> impl Stream> + Send + 'static { // provide a little buffer so that we don't slow down the sender - let (tx, rx) = flume::bounded(32); + let (tx, rx) = async_channel::bounded(32); let mut conn_infos = self.inner.endpoint.connection_infos(); conn_infos.sort_by_key(|n| n.node_id.to_string()); self.local_pool_handle().spawn_detached(|| async move { for conn_info in conn_infos { - tx.send_async(Ok(ConnectionsResponse { conn_info })) - .await - .ok(); + tx.send(Ok(ConnectionsResponse { conn_info })).await.ok(); } }); - rx.into_stream() + rx } // This method is called as an RPC method, which have to be async @@ -1125,7 +1122,7 @@ async fn download( endpoint: Endpoint, downloader: &Downloader, req: BlobDownloadRequest, - progress: FlumeProgressSender, + progress: AsyncChannelProgressSender, ) -> Result<()> where D: iroh_blobs::store::Store, @@ -1175,7 +1172,7 @@ async fn download_queued( downloader: &Downloader, hash_and_format: HashAndFormat, nodes: Vec, - progress: FlumeProgressSender, + progress: AsyncChannelProgressSender, ) -> Result { let mut node_ids = Vec::with_capacity(nodes.len()); let mut any_added = false; @@ -1199,7 +1196,7 @@ async fn download_direct_from_nodes( endpoint: Endpoint, hash_and_format: HashAndFormat, nodes: Vec, - progress: FlumeProgressSender, + progress: AsyncChannelProgressSender, ) -> Result where D: BaoStore, @@ -1232,7 +1229,7 @@ async fn download_direct( endpoint: Endpoint, hash_and_format: HashAndFormat, node: NodeAddr, - progress: FlumeProgressSender, + progress: AsyncChannelProgressSender, ) -> Result where D: BaoStore, diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs index e032691df9..83a21f8d7f 100644 --- a/iroh/tests/gc.rs +++ b/iroh/tests/gc.rs @@ -37,11 +37,14 @@ pub fn simulate_remote(data: &[u8]) -> (blake3::Hash, Cursor) { } /// Wrap a bao store in a node that has gc enabled. -async fn wrap_in_node(bao_store: S, gc_period: Duration) -> (Node, flume::Receiver<()>) +async fn wrap_in_node( + bao_store: S, + gc_period: Duration, +) -> (Node, async_channel::Receiver<()>) where S: iroh_blobs::store::Store, { - let (gc_send, gc_recv) = flume::unbounded(); + let (gc_send, gc_recv) = async_channel::unbounded(); let node = node::Builder::with_db_and_store( bao_store, DocsStorage::Memory, @@ -49,7 +52,7 @@ where ) .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) .register_gc_done_cb(Box::new(move || { - gc_send.send(()).ok(); + gc_send.send_blocking(()).ok(); })) .spawn() .await @@ -60,19 +63,19 @@ where async fn gc_test_node() -> ( Node, iroh_blobs::store::mem::Store, - flume::Receiver<()>, + async_channel::Receiver<()>, ) { let bao_store = iroh_blobs::store::mem::Store::new(); let (node, gc_recv) = wrap_in_node(bao_store.clone(), Duration::from_millis(500)).await; (node, bao_store, gc_recv) } -async fn step(evs: &flume::Receiver<()>) { +async fn step(evs: &async_channel::Receiver<()>) { // drain the event queue, we want a new GC while evs.try_recv().is_ok() {} // wait for several GC cycles for _ in 0..3 { - evs.recv_async().await.unwrap(); + evs.recv().await.unwrap(); } } @@ -191,7 +194,7 @@ mod file { use iroh_blobs::{ store::{BaoBatchWriter, ConsistencyCheckProgress, Map, MapEntryMut, ReportLevel}, - util::progress::{FlumeProgressSender, ProgressSender as _}, + util::progress::{AsyncChannelProgressSender, ProgressSender as _}, TempTag, }; use tokio::io::AsyncReadExt; @@ -212,16 +215,16 @@ mod file { async fn check_consistency(store: &impl Store) -> anyhow::Result { let mut max_level = ReportLevel::Trace; - let (tx, rx) = flume::bounded(1); + let (tx, rx) = async_channel::bounded(1); let task = tokio::task::spawn(async move { - while let Ok(ev) = rx.recv_async().await { + while let Ok(ev) = rx.recv().await { if let ConsistencyCheckProgress::Update { level, .. } = &ev { max_level = max_level.max(*level); } } }); store - .consistency_check(false, FlumeProgressSender::new(tx).boxed()) + .consistency_check(false, AsyncChannelProgressSender::new(tx).boxed()) .await?; task.await?; Ok(max_level)