Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Other way to handle blocking in streaming.rs #155

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
.DS_Store
.DS_Store
/.vscode
9 changes: 0 additions & 9 deletions .vscode/settings.json

This file was deleted.

2 changes: 1 addition & 1 deletion crates/librqbit/src/http_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl HttpApi {
let range_header = headers.get(http::header::RANGE);
trace!(torrent_id=idx, file_id=file_id, range=?range_header, "request for HTTP stream");

if let Some(range) = headers.get(http::header::RANGE) {
if let Some(range) = range_header {
let offset: Option<u64> = range
.to_str()
.ok()
Expand Down
9 changes: 7 additions & 2 deletions crates/librqbit/src/tests/e2e_stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{net::SocketAddr, time::Duration};

use anyhow::Context;
use tempfile::TempDir;
use tokio::{io::AsyncReadExt, time::timeout};
use tracing::info;

Expand All @@ -22,7 +23,7 @@ async fn e2e_stream() -> anyhow::Result<()> {
let orig_content = std::fs::read(files.path().join("0.data")).unwrap();

let server_session = Session::new_with_opts(
"/does-not-matter".into(),
files.path().into(),
crate::SessionOptions {
disable_dht: true,
persistence: false,
Expand Down Expand Up @@ -63,8 +64,10 @@ async fn e2e_stream() -> anyhow::Result<()> {
server_session.tcp_listen_port().unwrap(),
);

let client_dir = TempDir::with_prefix("test_e2e_stream_client")?;

let client_session = Session::new_with_opts(
"/does-not-matter".into(),
client_dir.path().into(),
crate::SessionOptions {
disable_dht: true,
persistence: false,
Expand Down Expand Up @@ -98,6 +101,8 @@ async fn e2e_stream() -> anyhow::Result<()> {
let mut buf = Vec::<u8>::with_capacity(8192);
stream.read_to_end(&mut buf).await?;

assert_eq!(buf.len(), orig_content.len(), "sizes differ");

if buf != orig_content {
panic!("contents differ")
}
Expand Down
165 changes: 127 additions & 38 deletions crates/librqbit/src/torrent_state/streaming.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::VecDeque,
io::SeekFrom,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
Expand All @@ -9,11 +10,16 @@ use std::{
};

use anyhow::Context;
use bytes::BytesMut;
use dashmap::DashMap;

use futures::Future;
use librqbit_core::lengths::{CurrentPiece, Lengths, ValidPieceIndex};
use tokio::io::{AsyncRead, AsyncSeek};
use tracing::{debug, trace};
use tokio::{
io::{AsyncRead, AsyncSeek},
task::{spawn_blocking, JoinHandle},
};
use tracing::{debug, error, trace};

use crate::{file_info::FileInfo, storage::TorrentStorage, ManagedTorrent};

Expand Down Expand Up @@ -136,6 +142,10 @@ pub struct FileStream {
// file params
file_len: u64,
file_torrent_abs_offset: u64,

// for AsyncRead
future: Option<JoinHandle<Result<BytesMut, anyhow::Error>>>,
buffer: Option<BytesMut>,
}

macro_rules! map_io_err {
Expand Down Expand Up @@ -163,7 +173,6 @@ impl AsyncRead for FileStream {
cx: &mut std::task::Context<'_>,
tbuf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
// if the file is over, return 0
if self.position == self.file_len {
trace!(
stream_id = self.stream_id,
Expand All @@ -179,49 +188,126 @@ impl AsyncRead for FileStream {
.lengths
.compute_current_piece(self.position, self.file_torrent_abs_offset)
.context("invalid position"));

let initial_buf_size = tbuf.filled().len();
// if the piece is not there, register to wake when it is
// check if we have the piece for real
let have = poll_try_io!(self.torrent.with_chunk_tracker(|ct| {
let have = ct.get_have_pieces()[current.id.get() as usize];
if !have {
self.streams
.register_waker(self.stream_id, cx.waker().clone());
}
have
}));
let have = poll_try_io!(self
.torrent
.with_chunk_tracker(|ct| ct.get_have_pieces()[current.id.get() as usize]));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason the waker was registered in the callback is it happens while the chunk_tracker lock is taken. Doing it outside the lock is harder to reason about - it's not obvious that this has no bugs, and needs to be proven - e.g. what happens in a race condition:

  1. you check for have and it's false
  2. another thread adds the piece and notifies all existing wakers
  3. you register a waker and it's never notified

Thus it's a race and need to move it back inside the lock

if !have {
trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %current.id, "poll pending, not have");
self.streams
.register_waker(self.stream_id, cx.waker().clone());
trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %current.id, "poll pending, not have piece yet");
return Poll::Pending;
}

// actually stream the piece
let buf = tbuf.initialize_unfilled();
let file_remaining = self.file_len - self.position;
let bytes_to_read: usize = poll_try_io!((buf.len() as u64)
.min(current.piece_remaining as u64)
.min(file_remaining)
.try_into());

let buf = &mut buf[..bytes_to_read];
trace!(
buflen = buf.len(),
stream_id = self.stream_id,
file_id = self.file_id,
"will write bytes"
);
loop {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I think the previous implementation was much simpler to read and reason about. Not even saying it was simple! But I don't see how is it worth it in the end - the code is much harder and it requires allocations for the buffer, while previously it didn't.

The added complexity is a price we pay today for solving an issue that is purely theoretical.

// read any data remaning in buffer
if let Some(mut buffer) = self.buffer.take() {
if buffer.len() > 0 {
let to_copy = buffer.len().min(tbuf.remaining());
let data_to_copy = buffer.split_to(to_copy);
tbuf.put_slice(&data_to_copy);
self.advance(to_copy);
if buffer.len() > 0 {
self.buffer = Some(buffer);
}
// TODO: consider how to better reuse the buffer
}
}

poll_try_io!(poll_try_io!(self.torrent.with_storage_and_file(
self.file_id,
|files, _fi| {
files.pread_exact(self.file_id, self.position, buf)?;
Ok::<_, anyhow::Error>(())
// tbuf if full or contains current piece - return it
if tbuf.remaining() == 0
|| tbuf.filled().len() - initial_buf_size >= current.piece_remaining as usize
{
break;
} else {
trace!(
stream_id = self.stream_id, file_id = self.file_id, piece_id = %current.id,
"tbuf output buffer is not full, got {} bytes, should get at least {} of remaining capacity {}",
tbuf.filled().len() - initial_buf_size,
current.piece_remaining,
tbuf.remaining()
);
}
)));

self.as_mut().advance(bytes_to_read as u64);
tbuf.advance(bytes_to_read);
// check if future is finished
if let Some(future) = self.future.as_mut() {
let pinned = Pin::new(future);
match pinned.poll(cx) {
Poll::Ready(Ok(Ok(x))) => {
self.buffer = self
.buffer
.take()
.map(|mut b| {
b.extend_from_slice(&x);
b
})
.or_else(|| Some(x));
self.future = None;
}

Poll::Ready(Ok(Err(e))) => {
error!("Read from store error {:?}", e);
return Poll::Ready(map_io_err!(Err(e)));
}
Poll::Ready(Err(e)) => {
error!("Join test error {:?}", e);
return Poll::Ready(Err(e.into()));
}
Poll::Pending => {
// return immediatelly if we have something, rather then waiting for full buffer
if tbuf.filled().len() - initial_buf_size > 0 {
break;
}
return Poll::Pending;
}
}
} else {
// no future, create one
if self.position >= self.file_len {
break;
}
// TODO: this will try to get all piece data into buffer - not sure if we want that as can consume more memory
let buf_capacity: usize = current.piece_remaining as usize;
let mut buf = self
.buffer
.take()
.map(|mut b| {
let need = buf_capacity - b.capacity();
b.reserve(need);
b
})
.unwrap_or_else(|| BytesMut::with_capacity(buf_capacity));
let file_remaining = self.file_len - self.position;
let bytes_to_read: usize = poll_try_io!(file_remaining
.min((buf.capacity() - buf.len()) as u64)
.try_into());
assert!(bytes_to_read > 0, "no bytes to read");
let torrent_handle = self.torrent.clone();
let file_id = self.file_id;
let position = self.position;

let read_future = spawn_blocking(move || {
torrent_handle
.with_storage_and_file(file_id, move |files, _fi| {
let mut start = buf.split();
buf.resize(bytes_to_read, 0);
files.pread_exact(file_id, position, &mut buf)?;
start.unsplit(buf);
trace!(
file_id,
"Read {} bytes from store at position {}",
start.len(),
position
);
Ok::<_, anyhow::Error>(start)
})
.and_then(|x| x)
});
self.future = Some(read_future);
}
}
Poll::Ready(Ok(()))
}
}
Expand Down Expand Up @@ -324,6 +410,9 @@ impl ManagedTorrent {
file_len: fd_len,
file_torrent_abs_offset: fd_offset,
torrent: self,

future: None,
buffer: None,
};
s.torrent.maybe_reconnect_needed_peers_for_file(file_id);
streams.streams.insert(
Expand All @@ -348,8 +437,8 @@ impl FileStream {
self.position
}

fn advance(&mut self, diff: u64) {
self.set_position(self.position + diff)
fn advance(&mut self, diff: usize) {
self.set_position(self.position + diff as u64)
}

fn set_position(&mut self, new_pos: u64) {
Expand Down