diff --git a/.gitignore b/.gitignore index 212de442..761f1e64 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target -.DS_Store \ No newline at end of file +.DS_Store +/.vscode \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index dc6e86ef..00000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "editor.defaultFormatter": "esbenp.prettier-vscode", - "[javascript]": { - "editor.defaultFormatter": "esbenp.prettier-vscode" - }, - "[rust]": { - "editor.defaultFormatter": "rust-lang.rust-analyzer" - } -} diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 57c4d8bb..dbd82746 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -170,7 +170,7 @@ impl HttpApi { let range_header = headers.get(http::header::RANGE); trace!(torrent_id=idx, file_id=file_id, range=?range_header, "request for HTTP stream"); - if let Some(range) = headers.get(http::header::RANGE) { + if let Some(range) = range_header { let offset: Option = range .to_str() .ok() diff --git a/crates/librqbit/src/tests/e2e_stream.rs b/crates/librqbit/src/tests/e2e_stream.rs index 97c65ead..8ba4f7a3 100644 --- a/crates/librqbit/src/tests/e2e_stream.rs +++ b/crates/librqbit/src/tests/e2e_stream.rs @@ -1,6 +1,7 @@ use std::{net::SocketAddr, time::Duration}; use anyhow::Context; +use tempfile::TempDir; use tokio::{io::AsyncReadExt, time::timeout}; use tracing::info; @@ -22,7 +23,7 @@ async fn e2e_stream() -> anyhow::Result<()> { let orig_content = std::fs::read(files.path().join("0.data")).unwrap(); let server_session = Session::new_with_opts( - "/does-not-matter".into(), + files.path().into(), crate::SessionOptions { disable_dht: true, persistence: false, @@ -63,8 +64,10 @@ async fn e2e_stream() -> anyhow::Result<()> { server_session.tcp_listen_port().unwrap(), ); + let client_dir = TempDir::with_prefix("test_e2e_stream_client")?; + let client_session = Session::new_with_opts( - "/does-not-matter".into(), + client_dir.path().into(), crate::SessionOptions { disable_dht: true, persistence: false, @@ -98,6 +101,8 @@ async fn e2e_stream() -> anyhow::Result<()> { let mut buf = Vec::::with_capacity(8192); stream.read_to_end(&mut buf).await?; + assert_eq!(buf.len(), orig_content.len(), "sizes differ"); + if buf != orig_content { panic!("contents differ") } diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 48b4b3ed..a024b20f 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -1,6 +1,7 @@ use std::{ collections::VecDeque, io::SeekFrom, + pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -9,11 +10,16 @@ use std::{ }; use anyhow::Context; +use bytes::BytesMut; use dashmap::DashMap; +use futures::Future; use librqbit_core::lengths::{CurrentPiece, Lengths, ValidPieceIndex}; -use tokio::io::{AsyncRead, AsyncSeek}; -use tracing::{debug, trace}; +use tokio::{ + io::{AsyncRead, AsyncSeek}, + task::{spawn_blocking, JoinHandle}, +}; +use tracing::{debug, error, trace}; use crate::{file_info::FileInfo, storage::TorrentStorage, ManagedTorrent}; @@ -136,6 +142,10 @@ pub struct FileStream { // file params file_len: u64, file_torrent_abs_offset: u64, + + // for AsyncRead + future: Option>>, + buffer: Option, } macro_rules! map_io_err { @@ -163,7 +173,6 @@ impl AsyncRead for FileStream { cx: &mut std::task::Context<'_>, tbuf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { - // if the file is over, return 0 if self.position == self.file_len { trace!( stream_id = self.stream_id, @@ -179,49 +188,127 @@ impl AsyncRead for FileStream { .lengths .compute_current_piece(self.position, self.file_torrent_abs_offset) .context("invalid position")); - + let initial_buf_size = tbuf.filled().len(); // if the piece is not there, register to wake when it is // check if we have the piece for real let have = poll_try_io!(self.torrent.with_chunk_tracker(|ct| { let have = ct.get_have_pieces()[current.id.get() as usize]; - if !have { - self.streams - .register_waker(self.stream_id, cx.waker().clone()); - } + self.streams + .register_waker(self.stream_id, cx.waker().clone()); have })); if !have { - trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %current.id, "poll pending, not have"); + trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %current.id, "poll pending, not have piece yet"); return Poll::Pending; } - // actually stream the piece - let buf = tbuf.initialize_unfilled(); - let file_remaining = self.file_len - self.position; - let bytes_to_read: usize = poll_try_io!((buf.len() as u64) - .min(current.piece_remaining as u64) - .min(file_remaining) - .try_into()); - - let buf = &mut buf[..bytes_to_read]; - trace!( - buflen = buf.len(), - stream_id = self.stream_id, - file_id = self.file_id, - "will write bytes" - ); + loop { + // read any data remaning in buffer + if let Some(mut buffer) = self.buffer.take() { + if buffer.len() > 0 { + let to_copy = buffer.len().min(tbuf.remaining()); + let data_to_copy = buffer.split_to(to_copy); + tbuf.put_slice(&data_to_copy); + self.advance(to_copy); + if buffer.len() > 0 { + self.buffer = Some(buffer); + } + // TODO: consider how to better reuse the buffer + } + } - poll_try_io!(poll_try_io!(self.torrent.with_storage_and_file( - self.file_id, - |files, _fi| { - files.pread_exact(self.file_id, self.position, buf)?; - Ok::<_, anyhow::Error>(()) + // tbuf if full or contains current piece - return it + if tbuf.remaining() == 0 + || tbuf.filled().len() - initial_buf_size >= current.piece_remaining as usize + { + break; + } else { + trace!( + stream_id = self.stream_id, file_id = self.file_id, piece_id = %current.id, + "tbuf output buffer is not full, got {} bytes, should get at least {} of remaining capacity {}", + tbuf.filled().len() - initial_buf_size, + current.piece_remaining, + tbuf.remaining() + ); } - ))); - self.as_mut().advance(bytes_to_read as u64); - tbuf.advance(bytes_to_read); + // check if future is finished + if let Some(future) = self.future.as_mut() { + let pinned = Pin::new(future); + match pinned.poll(cx) { + Poll::Ready(Ok(Ok(x))) => { + self.buffer = self + .buffer + .take() + .map(|mut b| { + b.extend_from_slice(&x); + b + }) + .or_else(|| Some(x)); + self.future = None; + } + Poll::Ready(Ok(Err(e))) => { + error!("Read from store error {:?}", e); + return Poll::Ready(map_io_err!(Err(e))); + } + Poll::Ready(Err(e)) => { + error!("Join test error {:?}", e); + return Poll::Ready(Err(e.into())); + } + Poll::Pending => { + // return immediatelly if we have something, rather then waiting for full buffer + if tbuf.filled().len() - initial_buf_size > 0 { + break; + } + return Poll::Pending; + } + } + } else { + // no future, create one + if self.position >= self.file_len { + break; + } + // TODO: this will try to get all piece data into buffer - not sure if we want that as can consume more memory + let buf_capacity: usize = current.piece_remaining as usize; + let mut buf = self + .buffer + .take() + .map(|mut b| { + let need = buf_capacity - b.capacity(); + b.reserve(need); + b + }) + .unwrap_or_else(|| BytesMut::with_capacity(buf_capacity)); + let file_remaining = self.file_len - self.position; + let bytes_to_read: usize = poll_try_io!(file_remaining + .min((buf.capacity() - buf.len()) as u64) + .try_into()); + assert!(bytes_to_read > 0, "no bytes to read"); + let torrent_handle = self.torrent.clone(); + let file_id = self.file_id; + let position = self.position; + + let read_future = spawn_blocking(move || { + torrent_handle + .with_storage_and_file(file_id, move |files, _fi| { + let mut start = buf.split(); + buf.resize(bytes_to_read, 0); + files.pread_exact(file_id, position, &mut buf)?; + start.unsplit(buf); + trace!( + file_id, + "Read {} bytes from store at position {}", + start.len(), + position + ); + Ok::<_, anyhow::Error>(start) + }) + .and_then(|x| x) + }); + self.future = Some(read_future); + } + } Poll::Ready(Ok(())) } } @@ -324,6 +411,9 @@ impl ManagedTorrent { file_len: fd_len, file_torrent_abs_offset: fd_offset, torrent: self, + + future: None, + buffer: None, }; s.torrent.maybe_reconnect_needed_peers_for_file(file_id); streams.streams.insert( @@ -348,8 +438,8 @@ impl FileStream { self.position } - fn advance(&mut self, diff: u64) { - self.set_position(self.position + diff) + fn advance(&mut self, diff: usize) { + self.set_position(self.position + diff as u64) } fn set_position(&mut self, new_pos: u64) {