Skip to content

Commit

Permalink
Merge pull request #196 from ikatson/fix-e2e-test
Browse files Browse the repository at this point in the history
Fix a bug in merge_two_streams
  • Loading branch information
ikatson authored Aug 18, 2024
2 parents 2ee8366 + d028e2e commit a9df633
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 28 deletions.
21 changes: 15 additions & 6 deletions crates/librqbit/src/merge_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use futures::Stream;

struct MergedStreams<S1, S2> {
poll_count: AtomicU64,
s1: S1,
s2: S2,
s1: futures::stream::Fuse<S1>,
s2: futures::stream::Fuse<S2>,
}

pub fn merge_streams<
Expand All @@ -19,10 +19,11 @@ pub fn merge_streams<
s1: S1,
s2: S2,
) -> impl Stream<Item = I> + Unpin + 'static {
use futures::stream::StreamExt;
MergedStreams {
poll_count: AtomicU64::new(0),
s1,
s2,
s1: s1.fuse(),
s2: s2.fuse(),
}
}

Expand All @@ -34,8 +35,16 @@ fn poll_two<I, S1: Stream<Item = I> + Unpin, S2: Stream<Item = I> + Unpin>(
use futures::StreamExt;
let s1p = s1.poll_next_unpin(cx);
match s1p {
Poll::Ready(r) => Poll::Ready(r),
Poll::Pending => s2.poll_next_unpin(cx),
Poll::Ready(Some(v)) => Poll::Ready(Some(v)),
Poll::Ready(None) => match s2.poll_next_unpin(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(v)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
},
Poll::Pending => match s2.poll_next_unpin(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(v)),
Poll::Ready(None) | Poll::Pending => Poll::Pending,
},
}
}

Expand Down
33 changes: 28 additions & 5 deletions crates/librqbit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,22 @@ fn merge_two_optional_streams<T>(
s2: Option<impl Stream<Item = T> + Unpin + Send + 'static>,
) -> Option<BoxStream<'static, T>> {
match (s1, s2) {
(Some(s1), None) => Some(Box::pin(s1)),
(None, Some(s2)) => Some(Box::pin(s2)),
(Some(s1), Some(s2)) => Some(Box::pin(merge_streams(s1, s2))),
(None, None) => None,
(Some(s1), None) => {
debug!("merge_two_optional_streams: using first");
Some(Box::pin(s1))
}
(None, Some(s2)) => {
debug!("merge_two_optional_streams: using second");
Some(Box::pin(s2))
}
(Some(s1), Some(s2)) => {
debug!("merge_two_optional_streams: using both");
Some(Box::pin(merge_streams(s1, s2)))
}
(None, None) => {
debug!("merge_two_optional_streams: using none");
None
}
}
}

Expand Down Expand Up @@ -864,7 +876,14 @@ impl Session {
info,
trackers,
peer_rx: Some(rx),
initial_peers: seen.into_iter().collect(),
initial_peers: {
let seen = seen.into_iter().collect_vec();
info!(count=seen.len(), "seen");
for peer in &seen {
debug!(?peer, "seen")
}
seen
},
}
}
ReadMetainfoResult::ChannelClosed { .. } => {
Expand Down Expand Up @@ -1089,6 +1108,10 @@ impl Session {
// Merge "initial_peers" and "peer_rx" into one stream.
let peer_rx = merge_two_optional_streams(
if !initial_peers.is_empty() {
debug!(
count = initial_peers.len(),
"merging initial peers into peer_rx"
);
Some(futures::stream::iter(initial_peers.into_iter()))
} else {
None
Expand Down
39 changes: 25 additions & 14 deletions crates/librqbit/src/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use std::{
time::Duration,
};

use anyhow::bail;
use futures::{stream::FuturesUnordered, StreamExt};
use anyhow::{bail, Context};
use librqbit_core::magnet::Magnet;
use rand::Rng;
use tokio::{
Expand Down Expand Up @@ -45,15 +44,14 @@ async fn test_e2e_download() {
let num_servers = 128;

let torrent_file_bytes = torrent_file.as_bytes().unwrap();
let mut futs = FuturesUnordered::new();
let mut futs = Vec::new();

// 2. Start N servers that are serving that torrent, and return their IP:port combos.
// Disable DHT on each.
for i in 0u8..num_servers {
let torrent_file_bytes = torrent_file_bytes.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
let tempdir = tempdir.path().to_owned();
spawn(
let fut = spawn(
async move {
let peer_id = TestPeerMetadata {
server_id: i,
Expand Down Expand Up @@ -81,7 +79,7 @@ async fn test_e2e_download() {
},
)
.await
.unwrap();
.context("error starting session")?;

info!("started session");

Expand All @@ -95,8 +93,8 @@ async fn test_e2e_download() {
}),
)
.await
.unwrap();
let h = handle.into_handle().unwrap();
.context("error adding torrent")?;
let h = handle.into_handle().context("into_handle()")?;
let mut interval = interval(Duration::from_millis(100));

info!("added torrent");
Expand All @@ -114,25 +112,38 @@ async fn test_e2e_download() {
crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"),
_ => bail!("broken state"),
})
.unwrap();
.context("error checking for torrent liveness")?;
if is_live {
break;
}
}
info!("torrent is live");
tx.send(SocketAddr::new(
Ok::<_, anyhow::Error>(SocketAddr::new(
std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
session.tcp_listen_port().unwrap(),
session
.tcp_listen_port()
.context("expected session.tcp_listen_port() to be set")?,
))
}
.instrument(error_span!("server", id = i)),
);
futs.push(timeout(Duration::from_secs(30), rx));
futs.push(timeout(Duration::from_secs(30), fut));
}

let mut peers = Vec::new();
while let Some(addr) = futs.next().await {
peers.push(addr.unwrap().unwrap());
for (id, peer) in futures::future::join_all(futs)
.await
.into_iter()
.enumerate()
{
let peer = peer
.with_context(|| format!("join error, server={id}"))
.unwrap()
.with_context(|| format!("timeout, server={id}"))
.unwrap()
.with_context(|| format!("server couldn't start, server={id}"))
.unwrap();
peers.push(peer);
}

info!("started all servers, starting client");
Expand Down
18 changes: 15 additions & 3 deletions crates/librqbit/src/torrent_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,22 @@ impl ManagedTorrent {
loop {
match timeout(Duration::from_secs(5), peer_rx.next()).await {
Ok(Some(peer)) => {
debug!(?peer, "received peer from peer_rx");
let live = match live.upgrade() {
Some(live) => live,
None => return Ok(()),
};
live.add_peer_if_not_seen(peer).context("torrent closed")?;
}
Ok(None) => return Ok(()),
Ok(None) => {
debug!("peer_rx closed, closing peer adder");
return Ok(());
}
// If timeout, check if the torrent is live.
Err(_) if live.strong_count() == 0 => return Ok(()),
Err(_) if live.strong_count() == 0 => {
debug!("timed out waiting for peers, torrent isn't live, closing peer adder");
return Ok(());
}
Err(_) => continue,
}
}
Expand Down Expand Up @@ -313,6 +320,8 @@ impl ManagedTorrent {
let live =
TorrentStateLive::new(paused, tx, live_cancellation_token)?;
g.state = ManagedTorrentState::Live(live.clone());
drop(g);

t.state_change_notify.notify_waiters();

spawn_fatal_errors_receiver(&t, rx, token);
Expand All @@ -336,6 +345,8 @@ impl ManagedTorrent {
let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone())?;
g.state = ManagedTorrentState::Live(live.clone());
drop(g);

spawn_fatal_errors_receiver(self, rx, live_cancellation_token);
spawn_peer_adder(&live, peer_rx);
Ok(())
Expand All @@ -347,9 +358,10 @@ impl ManagedTorrent {
self.storage_factory.create_and_init(self.info())?,
));
g.state = ManagedTorrentState::Initializing(initializing.clone());
self.state_change_notify.notify_waiters();
drop(g);

self.state_change_notify.notify_waiters();

// Recurse.
self.start(
peer_rx,
Expand Down

0 comments on commit a9df633

Please sign in to comment.