From 1c9aa8ca72ea3586990e6f094a7be80d82634e49 Mon Sep 17 00:00:00 2001 From: Ivan Date: Sat, 20 Jul 2024 20:03:14 +0200 Subject: [PATCH 1/7] Fist naive attempt to improve streaming at leat inform executor about blocking call --- crates/librqbit/src/http_api.rs | 2 +- .../librqbit/src/torrent_state/streaming.rs | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 57c4d8bb..dbd82746 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -170,7 +170,7 @@ impl HttpApi { let range_header = headers.get(http::header::RANGE); trace!(torrent_id=idx, file_id=file_id, range=?range_header, "request for HTTP stream"); - if let Some(range) = headers.get(http::header::RANGE) { + if let Some(range) = range_header { let offset: Option = range .to_str() .ok() diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 48b4b3ed..e96e9665 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -12,7 +12,10 @@ use anyhow::Context; use dashmap::DashMap; use librqbit_core::lengths::{CurrentPiece, Lengths, ValidPieceIndex}; -use tokio::io::{AsyncRead, AsyncSeek}; +use tokio::{ + io::{AsyncRead, AsyncSeek}, + task::block_in_place, +}; use tracing::{debug, trace}; use crate::{file_info::FileInfo, storage::TorrentStorage, ManagedTorrent}; @@ -211,13 +214,13 @@ impl AsyncRead for FileStream { "will write bytes" ); - poll_try_io!(poll_try_io!(self.torrent.with_storage_and_file( - self.file_id, - |files, _fi| { - files.pread_exact(self.file_id, self.position, buf)?; - Ok::<_, anyhow::Error>(()) - } - ))); + poll_try_io!(poll_try_io!(block_in_place(|| { + self.torrent + .with_storage_and_file(self.file_id, |files, _fi| { + files.pread_exact(self.file_id, self.position, buf)?; + Ok::<_, anyhow::Error>(()) + }) + }))); self.as_mut().advance(bytes_to_read as u64); tbuf.advance(bytes_to_read); From 4305d25d1a6b3acc5a411068f18a644aeb3e44a3 Mon Sep 17 00:00:00 2001 From: Ivan Date: Sun, 21 Jul 2024 15:08:23 +0200 Subject: [PATCH 2/7] Fix e2e streaming test --- crates/librqbit/src/tests/e2e_stream.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/librqbit/src/tests/e2e_stream.rs b/crates/librqbit/src/tests/e2e_stream.rs index 97c65ead..770f4d13 100644 --- a/crates/librqbit/src/tests/e2e_stream.rs +++ b/crates/librqbit/src/tests/e2e_stream.rs @@ -1,6 +1,7 @@ use std::{net::SocketAddr, time::Duration}; use anyhow::Context; +use tempfile::TempDir; use tokio::{io::AsyncReadExt, time::timeout}; use tracing::info; @@ -22,7 +23,7 @@ async fn e2e_stream() -> anyhow::Result<()> { let orig_content = std::fs::read(files.path().join("0.data")).unwrap(); let server_session = Session::new_with_opts( - "/does-not-matter".into(), + files.path().into(), crate::SessionOptions { disable_dht: true, persistence: false, @@ -63,8 +64,10 @@ async fn e2e_stream() -> anyhow::Result<()> { server_session.tcp_listen_port().unwrap(), ); + let client_dir = TempDir::with_prefix("test_e2e_stream_client")?; + let client_session = Session::new_with_opts( - "/does-not-matter".into(), + client_dir.path().into(), crate::SessionOptions { disable_dht: true, persistence: false, From c670974956d9464587acf832273387a71d5e09bc Mon Sep 17 00:00:00 2001 From: Ivan Date: Sun, 21 Jul 2024 15:09:02 +0200 Subject: [PATCH 3/7] WIP on streaming --- .../librqbit/src/torrent_state/streaming.rs | 163 ++++++++++++++---- 1 file changed, 128 insertions(+), 35 deletions(-) diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index e96e9665..eb95c273 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -1,6 +1,7 @@ use std::{ collections::VecDeque, io::SeekFrom, + pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -9,12 +10,14 @@ use std::{ }; use anyhow::Context; +use bytes::BytesMut; use dashmap::DashMap; +use futures::Future; use librqbit_core::lengths::{CurrentPiece, Lengths, ValidPieceIndex}; use tokio::{ io::{AsyncRead, AsyncSeek}, - task::block_in_place, + task::{block_in_place, spawn_blocking, JoinHandle}, }; use tracing::{debug, trace}; @@ -139,6 +142,10 @@ pub struct FileStream { // file params file_len: u64, file_torrent_abs_offset: u64, + + // for AsyncRead + future: Option>>, + buffer: Option, } macro_rules! map_io_err { @@ -185,45 +192,128 @@ impl AsyncRead for FileStream { // if the piece is not there, register to wake when it is // check if we have the piece for real - let have = poll_try_io!(self.torrent.with_chunk_tracker(|ct| { - let have = ct.get_have_pieces()[current.id.get() as usize]; - if !have { - self.streams - .register_waker(self.stream_id, cx.waker().clone()); - } - have - })); + let have = poll_try_io!(self + .torrent + .with_chunk_tracker(|ct| ct.get_have_pieces()[current.id.get() as usize])); if !have { + self.streams + .register_waker(self.stream_id, cx.waker().clone()); trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %current.id, "poll pending, not have"); return Poll::Pending; } - // actually stream the piece - let buf = tbuf.initialize_unfilled(); - let file_remaining = self.file_len - self.position; - let bytes_to_read: usize = poll_try_io!((buf.len() as u64) - .min(current.piece_remaining as u64) - .min(file_remaining) - .try_into()); - - let buf = &mut buf[..bytes_to_read]; - trace!( - buflen = buf.len(), - stream_id = self.stream_id, - file_id = self.file_id, - "will write bytes" - ); + loop { + // read any data remaning in buffer + if let Some(mut buffer) = self.buffer.take() { + if buffer.len() > 0 { + let to_copy = buffer.len().min(tbuf.remaining()); + let data_to_copy = buffer.split_to(to_copy); + tbuf.put_slice(&data_to_copy); + self.advance(to_copy); + if buffer.len() > 0 { + self.buffer = Some(buffer); + } + } + } - poll_try_io!(poll_try_io!(block_in_place(|| { - self.torrent - .with_storage_and_file(self.file_id, |files, _fi| { - files.pread_exact(self.file_id, self.position, buf)?; - Ok::<_, anyhow::Error>(()) - }) - }))); + // tbuf if full nothing else to do + if tbuf.remaining() == 0 { + break; + } - self.as_mut().advance(bytes_to_read as u64); - tbuf.advance(bytes_to_read); + // check if future finished + if let Some(future) = self.future.as_mut() { + let pinned = Pin::new(future); + match pinned.poll(cx) { + Poll::Ready(Ok(Ok(x))) => { + self.buffer = self + .buffer + .take() + .map(|mut b| { + b.extend_from_slice(&x); + b + }) + .or_else(|| Some(x)); + self.future = None; + } + + Poll::Ready(Ok(Err(e))) => { + return Poll::Ready(map_io_err!(Err(e))); + } + Poll::Ready(Err(e)) => { + debug!("join error {:?}", e); + return Poll::Ready(Err(e.into())); + } + Poll::Pending => { + return Poll::Pending; + } + } + } else { + // no future, create one + if self.position >= self.file_len { + break; + } + const BUF_CAPACITY: usize = 4 * 1024; + let mut buf = self + .buffer + .take() + .map(|mut b| { + let need = BUF_CAPACITY - b.capacity(); + b.reserve(need); + b + }) + .unwrap_or_else(|| BytesMut::with_capacity(BUF_CAPACITY)); + let file_remaining = self.file_len - self.position; + let bytes_to_read: usize = poll_try_io!((current.piece_remaining as u64) + .min(file_remaining) + .min((buf.capacity() - buf.len()) as u64) + .try_into()); + assert!(bytes_to_read > 0, "no bytes to read"); + let torrent_handle = self.torrent.clone(); + let file_id = self.file_id; + let position = self.position; + + let read_future = spawn_blocking(move || { + torrent_handle + .with_storage_and_file(file_id, move |files, _fi| { + let mut start = buf.split(); + buf.resize(bytes_to_read, 0); + files.pread_exact(file_id, position, &mut buf)?; + start.unsplit(buf); + Ok::<_, anyhow::Error>(start) + }) + .and_then(|x| x) + }); + self.future = Some(read_future); + } + + // actually stream the piece + // let buf = tbuf.initialize_unfilled(); + // let file_remaining = self.file_len - self.position; + // let bytes_to_read: usize = poll_try_io!((buf.len() as u64) + // .min(current.piece_remaining as u64) + // .min(file_remaining) + // .try_into()); + + // let buf = &mut buf[..bytes_to_read]; + // trace!( + // buflen = buf.len(), + // stream_id = self.stream_id, + // file_id = self.file_id, + // "will write bytes" + // ); + } + + // poll_try_io!(poll_try_io!(block_in_place(|| { + // self.torrent + // .with_storage_and_file(self.file_id, |files, _fi| { + // files.pread_exact(self.file_id, self.position, buf)?; + // Ok::<_, anyhow::Error>(()) + // }) + // }))); + + //self.as_mut().advance(bytes_read as u64); + // tbuf.advance(bytes_to_read); Poll::Ready(Ok(())) } @@ -327,6 +417,9 @@ impl ManagedTorrent { file_len: fd_len, file_torrent_abs_offset: fd_offset, torrent: self, + + future: None, + buffer: None, }; s.torrent.maybe_reconnect_needed_peers_for_file(file_id); streams.streams.insert( @@ -351,8 +444,8 @@ impl FileStream { self.position } - fn advance(&mut self, diff: u64) { - self.set_position(self.position + diff) + fn advance(&mut self, diff: usize) { + self.set_position(self.position + diff as u64) } fn set_position(&mut self, new_pos: u64) { From e4a0078b8fb732bc8bcb335480f4338d7f40e17e Mon Sep 17 00:00:00 2001 From: Ivan Date: Sun, 21 Jul 2024 19:44:54 +0200 Subject: [PATCH 4/7] FileStream to run blocking read in blocking threads --- crates/librqbit/src/tests/e2e_stream.rs | 2 + .../librqbit/src/torrent_state/streaming.rs | 75 +++++++++---------- 2 files changed, 36 insertions(+), 41 deletions(-) diff --git a/crates/librqbit/src/tests/e2e_stream.rs b/crates/librqbit/src/tests/e2e_stream.rs index 770f4d13..8ba4f7a3 100644 --- a/crates/librqbit/src/tests/e2e_stream.rs +++ b/crates/librqbit/src/tests/e2e_stream.rs @@ -101,6 +101,8 @@ async fn e2e_stream() -> anyhow::Result<()> { let mut buf = Vec::::with_capacity(8192); stream.read_to_end(&mut buf).await?; + assert_eq!(buf.len(), orig_content.len(), "sizes differ"); + if buf != orig_content { panic!("contents differ") } diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index eb95c273..6366f698 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -19,7 +19,7 @@ use tokio::{ io::{AsyncRead, AsyncSeek}, task::{block_in_place, spawn_blocking, JoinHandle}, }; -use tracing::{debug, trace}; +use tracing::{debug, error, trace}; use crate::{file_info::FileInfo, storage::TorrentStorage, ManagedTorrent}; @@ -173,7 +173,6 @@ impl AsyncRead for FileStream { cx: &mut std::task::Context<'_>, tbuf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { - // if the file is over, return 0 if self.position == self.file_len { trace!( stream_id = self.stream_id, @@ -189,7 +188,7 @@ impl AsyncRead for FileStream { .lengths .compute_current_piece(self.position, self.file_torrent_abs_offset) .context("invalid position")); - + let initial_buf_size = tbuf.filled().len(); // if the piece is not there, register to wake when it is // check if we have the piece for real let have = poll_try_io!(self @@ -198,7 +197,7 @@ impl AsyncRead for FileStream { if !have { self.streams .register_waker(self.stream_id, cx.waker().clone()); - trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %current.id, "poll pending, not have"); + trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %current.id, "poll pending, not have piece yet"); return Poll::Pending; } @@ -213,15 +212,26 @@ impl AsyncRead for FileStream { if buffer.len() > 0 { self.buffer = Some(buffer); } + // TODO: consider how to better reuse the buffer } } - // tbuf if full nothing else to do - if tbuf.remaining() == 0 { + // tbuf if full or contains current piece - return it + if tbuf.remaining() == 0 + || tbuf.filled().len() - initial_buf_size >= current.piece_remaining as usize + { break; + } else { + trace!( + stream_id = self.stream_id, file_id = self.file_id, piece_id = %current.id, + "tbuf output buffer is not full, got {} bytes, should get at least {} of remaining capacity {}", + tbuf.filled().len() - initial_buf_size, + current.piece_remaining, + tbuf.remaining() + ); } - // check if future finished + // check if future is finished if let Some(future) = self.future.as_mut() { let pinned = Pin::new(future); match pinned.poll(cx) { @@ -238,13 +248,18 @@ impl AsyncRead for FileStream { } Poll::Ready(Ok(Err(e))) => { + error!("Read from store error {:?}", e); return Poll::Ready(map_io_err!(Err(e))); } Poll::Ready(Err(e)) => { - debug!("join error {:?}", e); + error!("Join test error {:?}", e); return Poll::Ready(Err(e.into())); } Poll::Pending => { + // return immediatelly if we have something, rather then waiting for full buffer + if tbuf.filled().len() - initial_buf_size > 0 { + break; + } return Poll::Pending; } } @@ -253,19 +268,19 @@ impl AsyncRead for FileStream { if self.position >= self.file_len { break; } - const BUF_CAPACITY: usize = 4 * 1024; + // TODO: this will try to get all piece data into buffer - not sure if we want that as can consume more memory + let buf_capacity: usize = current.piece_remaining as usize; let mut buf = self .buffer .take() .map(|mut b| { - let need = BUF_CAPACITY - b.capacity(); + let need = buf_capacity - b.capacity(); b.reserve(need); b }) - .unwrap_or_else(|| BytesMut::with_capacity(BUF_CAPACITY)); + .unwrap_or_else(|| BytesMut::with_capacity(buf_capacity)); let file_remaining = self.file_len - self.position; - let bytes_to_read: usize = poll_try_io!((current.piece_remaining as u64) - .min(file_remaining) + let bytes_to_read: usize = poll_try_io!(file_remaining .min((buf.capacity() - buf.len()) as u64) .try_into()); assert!(bytes_to_read > 0, "no bytes to read"); @@ -280,41 +295,19 @@ impl AsyncRead for FileStream { buf.resize(bytes_to_read, 0); files.pread_exact(file_id, position, &mut buf)?; start.unsplit(buf); + trace!( + file_id, + "Read {} bytes from store at position {}", + start.len(), + position + ); Ok::<_, anyhow::Error>(start) }) .and_then(|x| x) }); self.future = Some(read_future); } - - // actually stream the piece - // let buf = tbuf.initialize_unfilled(); - // let file_remaining = self.file_len - self.position; - // let bytes_to_read: usize = poll_try_io!((buf.len() as u64) - // .min(current.piece_remaining as u64) - // .min(file_remaining) - // .try_into()); - - // let buf = &mut buf[..bytes_to_read]; - // trace!( - // buflen = buf.len(), - // stream_id = self.stream_id, - // file_id = self.file_id, - // "will write bytes" - // ); } - - // poll_try_io!(poll_try_io!(block_in_place(|| { - // self.torrent - // .with_storage_and_file(self.file_id, |files, _fi| { - // files.pread_exact(self.file_id, self.position, buf)?; - // Ok::<_, anyhow::Error>(()) - // }) - // }))); - - //self.as_mut().advance(bytes_read as u64); - // tbuf.advance(bytes_to_read); - Poll::Ready(Ok(())) } } From 64812a95d48e640d12fe2e8b6c98f9f5dcc74e25 Mon Sep 17 00:00:00 2001 From: Ivan Date: Sun, 21 Jul 2024 19:45:42 +0200 Subject: [PATCH 5/7] Add .vscode to .gitignore --- .gitignore | 3 ++- .vscode/settings.json | 9 --------- 2 files changed, 2 insertions(+), 10 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.gitignore b/.gitignore index 212de442..761f1e64 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target -.DS_Store \ No newline at end of file +.DS_Store +/.vscode \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index dc6e86ef..00000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "editor.defaultFormatter": "esbenp.prettier-vscode", - "[javascript]": { - "editor.defaultFormatter": "esbenp.prettier-vscode" - }, - "[rust]": { - "editor.defaultFormatter": "rust-lang.rust-analyzer" - } -} From 59791e27c6493734079f16cf397c42e4aa37af4f Mon Sep 17 00:00:00 2001 From: Ivan Date: Sun, 21 Jul 2024 19:56:20 +0200 Subject: [PATCH 6/7] Cleanup --- crates/librqbit/src/torrent_state/streaming.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 6366f698..79d21450 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -17,7 +17,7 @@ use futures::Future; use librqbit_core::lengths::{CurrentPiece, Lengths, ValidPieceIndex}; use tokio::{ io::{AsyncRead, AsyncSeek}, - task::{block_in_place, spawn_blocking, JoinHandle}, + task::{spawn_blocking, JoinHandle}, }; use tracing::{debug, error, trace}; From b6573cc062df3446838073cbb9c1a1369ee51bbd Mon Sep 17 00:00:00 2001 From: Ivan Date: Tue, 23 Jul 2024 21:35:50 +0200 Subject: [PATCH 7/7] wake in context --- crates/librqbit/src/torrent_state/streaming.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 79d21450..a024b20f 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -191,12 +191,13 @@ impl AsyncRead for FileStream { let initial_buf_size = tbuf.filled().len(); // if the piece is not there, register to wake when it is // check if we have the piece for real - let have = poll_try_io!(self - .torrent - .with_chunk_tracker(|ct| ct.get_have_pieces()[current.id.get() as usize])); - if !have { + let have = poll_try_io!(self.torrent.with_chunk_tracker(|ct| { + let have = ct.get_have_pieces()[current.id.get() as usize]; self.streams .register_waker(self.stream_id, cx.waker().clone()); + have + })); + if !have { trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %current.id, "poll pending, not have piece yet"); return Poll::Pending; }