diff --git a/crates/librqbit/src/merge_streams.rs b/crates/librqbit/src/merge_streams.rs index 608de9ec..35205d4f 100644 --- a/crates/librqbit/src/merge_streams.rs +++ b/crates/librqbit/src/merge_streams.rs @@ -7,8 +7,8 @@ use futures::Stream; struct MergedStreams { poll_count: AtomicU64, - s1: S1, - s2: S2, + s1: futures::stream::Fuse, + s2: futures::stream::Fuse, } pub fn merge_streams< @@ -19,10 +19,11 @@ pub fn merge_streams< s1: S1, s2: S2, ) -> impl Stream + Unpin + 'static { + use futures::stream::StreamExt; MergedStreams { poll_count: AtomicU64::new(0), - s1, - s2, + s1: s1.fuse(), + s2: s2.fuse(), } } @@ -34,8 +35,16 @@ fn poll_two + Unpin, S2: Stream + Unpin>( use futures::StreamExt; let s1p = s1.poll_next_unpin(cx); match s1p { - Poll::Ready(r) => Poll::Ready(r), - Poll::Pending => s2.poll_next_unpin(cx), + Poll::Ready(Some(v)) => Poll::Ready(Some(v)), + Poll::Ready(None) => match s2.poll_next_unpin(cx) { + Poll::Ready(Some(v)) => Poll::Ready(Some(v)), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + }, + Poll::Pending => match s2.poll_next_unpin(cx) { + Poll::Ready(Some(v)) => Poll::Ready(Some(v)), + Poll::Ready(None) | Poll::Pending => Poll::Pending, + }, } } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index ff26c44d..c2505349 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -199,10 +199,22 @@ fn merge_two_optional_streams( s2: Option + Unpin + Send + 'static>, ) -> Option> { match (s1, s2) { - (Some(s1), None) => Some(Box::pin(s1)), - (None, Some(s2)) => Some(Box::pin(s2)), - (Some(s1), Some(s2)) => Some(Box::pin(merge_streams(s1, s2))), - (None, None) => None, + (Some(s1), None) => { + debug!("merge_two_optional_streams: using first"); + Some(Box::pin(s1)) + } + (None, Some(s2)) => { + debug!("merge_two_optional_streams: using second"); + Some(Box::pin(s2)) + } + (Some(s1), Some(s2)) => { + debug!("merge_two_optional_streams: using both"); + Some(Box::pin(merge_streams(s1, s2))) + } + (None, None) => { + debug!("merge_two_optional_streams: using none"); + None + } } } @@ -864,7 +876,14 @@ impl Session { info, trackers, peer_rx: Some(rx), - initial_peers: seen.into_iter().collect(), + initial_peers: { + let seen = seen.into_iter().collect_vec(); + info!(count=seen.len(), "seen"); + for peer in &seen { + debug!(?peer, "seen") + } + seen + }, } } ReadMetainfoResult::ChannelClosed { .. } => { @@ -1089,6 +1108,10 @@ impl Session { // Merge "initial_peers" and "peer_rx" into one stream. let peer_rx = merge_two_optional_streams( if !initial_peers.is_empty() { + debug!( + count = initial_peers.len(), + "merging initial peers into peer_rx" + ); Some(futures::stream::iter(initial_peers.into_iter())) } else { None diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index 4bc8e6df..4c5cf0c3 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -3,8 +3,7 @@ use std::{ time::Duration, }; -use anyhow::bail; -use futures::{stream::FuturesUnordered, StreamExt}; +use anyhow::{bail, Context}; use librqbit_core::magnet::Magnet; use rand::Rng; use tokio::{ @@ -45,15 +44,14 @@ async fn test_e2e_download() { let num_servers = 128; let torrent_file_bytes = torrent_file.as_bytes().unwrap(); - let mut futs = FuturesUnordered::new(); + let mut futs = Vec::new(); // 2. Start N servers that are serving that torrent, and return their IP:port combos. // Disable DHT on each. for i in 0u8..num_servers { let torrent_file_bytes = torrent_file_bytes.clone(); - let (tx, rx) = tokio::sync::oneshot::channel(); let tempdir = tempdir.path().to_owned(); - spawn( + let fut = spawn( async move { let peer_id = TestPeerMetadata { server_id: i, @@ -81,7 +79,7 @@ async fn test_e2e_download() { }, ) .await - .unwrap(); + .context("error starting session")?; info!("started session"); @@ -95,8 +93,8 @@ async fn test_e2e_download() { }), ) .await - .unwrap(); - let h = handle.into_handle().unwrap(); + .context("error adding torrent")?; + let h = handle.into_handle().context("into_handle()")?; let mut interval = interval(Duration::from_millis(100)); info!("added torrent"); @@ -114,25 +112,38 @@ async fn test_e2e_download() { crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"), _ => bail!("broken state"), }) - .unwrap(); + .context("error checking for torrent liveness")?; if is_live { break; } } info!("torrent is live"); - tx.send(SocketAddr::new( + Ok::<_, anyhow::Error>(SocketAddr::new( std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - session.tcp_listen_port().unwrap(), + session + .tcp_listen_port() + .context("expected session.tcp_listen_port() to be set")?, )) } .instrument(error_span!("server", id = i)), ); - futs.push(timeout(Duration::from_secs(30), rx)); + futs.push(timeout(Duration::from_secs(30), fut)); } let mut peers = Vec::new(); - while let Some(addr) = futs.next().await { - peers.push(addr.unwrap().unwrap()); + for (id, peer) in futures::future::join_all(futs) + .await + .into_iter() + .enumerate() + { + let peer = peer + .with_context(|| format!("join error, server={id}")) + .unwrap() + .with_context(|| format!("timeout, server={id}")) + .unwrap() + .with_context(|| format!("server couldn't start, server={id}")) + .unwrap(); + peers.push(peer); } info!("started all servers, starting client"); diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 0fe1a818..68f4d04b 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -257,15 +257,22 @@ impl ManagedTorrent { loop { match timeout(Duration::from_secs(5), peer_rx.next()).await { Ok(Some(peer)) => { + debug!(?peer, "received peer from peer_rx"); let live = match live.upgrade() { Some(live) => live, None => return Ok(()), }; live.add_peer_if_not_seen(peer).context("torrent closed")?; } - Ok(None) => return Ok(()), + Ok(None) => { + debug!("peer_rx closed, closing peer adder"); + return Ok(()); + } // If timeout, check if the torrent is live. - Err(_) if live.strong_count() == 0 => return Ok(()), + Err(_) if live.strong_count() == 0 => { + debug!("timed out waiting for peers, torrent isn't live, closing peer adder"); + return Ok(()); + } Err(_) => continue, } } @@ -313,6 +320,8 @@ impl ManagedTorrent { let live = TorrentStateLive::new(paused, tx, live_cancellation_token)?; g.state = ManagedTorrentState::Live(live.clone()); + drop(g); + t.state_change_notify.notify_waiters(); spawn_fatal_errors_receiver(&t, rx, token); @@ -336,6 +345,8 @@ impl ManagedTorrent { let (tx, rx) = tokio::sync::oneshot::channel(); let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone())?; g.state = ManagedTorrentState::Live(live.clone()); + drop(g); + spawn_fatal_errors_receiver(self, rx, live_cancellation_token); spawn_peer_adder(&live, peer_rx); Ok(()) @@ -347,9 +358,10 @@ impl ManagedTorrent { self.storage_factory.create_and_init(self.info())?, )); g.state = ManagedTorrentState::Initializing(initializing.clone()); - self.state_change_notify.notify_waiters(); drop(g); + self.state_change_notify.notify_waiters(); + // Recurse. self.start( peer_rx,