From 35d57ae8a2445ac2858540b469a2e8218f772117 Mon Sep 17 00:00:00 2001 From: LIAUD Corentin Date: Tue, 27 Aug 2024 22:00:14 +0200 Subject: [PATCH 1/2] feat: add on_piece_completed method on TorrentStorage --- crates/librqbit/examples/custom_storage.rs | 4 + crates/librqbit/src/file_ops.rs | 2 + crates/librqbit/src/peer_connection.rs | 5 +- .../librqbit/src/storage/examples/inmemory.rs | 75 ++++++++++++++++--- crates/librqbit/src/storage/examples/mmap.rs | 10 ++- crates/librqbit/src/storage/filesystem/fs.rs | 5 ++ .../librqbit/src/storage/filesystem/mmap.rs | 5 ++ crates/librqbit/src/storage/mod.rs | 7 ++ crates/librqbit/src/tests/test_util.rs | 3 +- .../src/extended/handshake.rs | 2 +- 10 files changed, 100 insertions(+), 18 deletions(-) diff --git a/crates/librqbit/examples/custom_storage.rs b/crates/librqbit/examples/custom_storage.rs index f22daa88..7ba7828c 100644 --- a/crates/librqbit/examples/custom_storage.rs +++ b/crates/librqbit/examples/custom_storage.rs @@ -57,6 +57,10 @@ impl TorrentStorage for CustomStorage { fn init(&mut self, _meta: &librqbit::ManagedTorrentShared) -> anyhow::Result<()> { anyhow::bail!("not implemented") } + + fn on_piece_completed(&self, _file_id: usize, _offset: u64) -> anyhow::Result<()> { + anyhow::bail!("not implemented") + } } #[tokio::main] diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index 237b08fd..45b4c935 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -215,6 +215,8 @@ impl<'a> FileOps<'a> { format!("error reading {to_read_in_file} bytes, file_id: {file_idx} (\"{name:?}\")") })?; + self.files.on_piece_completed(file_idx, absolute_offset)?; + piece_remaining_bytes -= to_read_in_file; if piece_remaining_bytes == 0 { diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 9f99f0d0..8401ae85 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -14,7 +14,10 @@ use librqbit_core::{ }; use parking_lot::RwLock; use peer_binary_protocol::{ - extended::{handshake::{ExtendedHandshake, YourIP}, ExtendedMessage}, + extended::{ + handshake::{ExtendedHandshake, YourIP}, + ExtendedMessage, + }, serialize_piece_preamble, Handshake, Message, MessageOwned, PIECE_MESSAGE_DEFAULT_LEN, }; use serde::{Deserialize, Serialize}; diff --git a/crates/librqbit/src/storage/examples/inmemory.rs b/crates/librqbit/src/storage/examples/inmemory.rs index 3bd2a389..41089504 100644 --- a/crates/librqbit/src/storage/examples/inmemory.rs +++ b/crates/librqbit/src/storage/examples/inmemory.rs @@ -8,28 +8,50 @@ use crate::type_aliases::FileInfos; use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage}; -struct InMemoryPiece { - bytes: Box<[u8]>, +pub struct InMemoryPiece { + pub content: Box<[u8]>, + pub has_been_validated: bool, } impl InMemoryPiece { - fn new(l: &Lengths) -> Self { + pub fn new(l: &Lengths) -> Self { let v = vec![0; l.default_piece_length() as usize].into_boxed_slice(); - Self { bytes: v } + Self { + content: v, + has_been_validated: false, + } + } + + pub fn can_be_discard(&self, upper_bound_offset: usize) -> bool { + self.has_been_validated && upper_bound_offset >= self.content.len() } } #[derive(Default, Clone)] -pub struct InMemoryExampleStorageFactory {} +pub struct InMemoryExampleStorageFactory { + max_ram_size_per_torrent: usize, +} + +impl InMemoryExampleStorageFactory { + pub fn new(max_ram_size_per_torrent: usize) -> Self { + Self { + max_ram_size_per_torrent, + } + } +} impl StorageFactory for InMemoryExampleStorageFactory { type Storage = InMemoryExampleStorage; fn create( &self, - info: &crate::torrent_state::ManagedTorrentInfo, + info: &crate::torrent_state::ManagedTorrentShared, ) -> anyhow::Result { - InMemoryExampleStorage::new(info.lengths, info.file_infos.clone()) + InMemoryExampleStorage::new( + info.lengths, + info.file_infos.clone(), + self.max_ram_size_per_torrent, + ) } fn clone_box(&self) -> crate::storage::BoxStorageFactory { @@ -41,10 +63,15 @@ pub struct InMemoryExampleStorage { lengths: Lengths, file_infos: FileInfos, map: RwLock>, + max_ram_size_per_torrent: usize, } impl InMemoryExampleStorage { - fn new(lengths: Lengths, file_infos: FileInfos) -> anyhow::Result { + fn new( + lengths: Lengths, + file_infos: FileInfos, + max_ram_size_per_torrent: usize, + ) -> anyhow::Result { // Max memory 128MiB. Make it tunable let max_pieces = 128 * 1024 * 1024 / lengths.default_piece_length(); if max_pieces == 0 { @@ -55,6 +82,7 @@ impl InMemoryExampleStorage { lengths, file_infos, map: RwLock::new(HashMap::new()), + max_ram_size_per_torrent, }) } } @@ -68,9 +96,16 @@ impl TorrentStorage for InMemoryExampleStorage { (abs_offset % self.lengths.default_piece_length() as u64).try_into()?; let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?; - let g = self.map.read(); + let mut g = self.map.write(); + // Get and remove this data from buffer to free space let inmp = g.get(&piece_id).context("piece expired")?; - buf.copy_from_slice(&inmp.bytes[piece_offset..(piece_offset + buf.len())]); + let upper_bound_offset = piece_offset + buf.len(); + buf.copy_from_slice(&inmp.content[piece_offset..upper_bound_offset]); + + if inmp.can_be_discard(upper_bound_offset) { + let _ = g.remove(&piece_id); + } + Ok(()) } @@ -81,11 +116,12 @@ impl TorrentStorage for InMemoryExampleStorage { let piece_offset: usize = (abs_offset % self.lengths.default_piece_length() as u64).try_into()?; let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?; + let mut g = self.map.write(); let inmp = g .entry(piece_id) .or_insert_with(|| InMemoryPiece::new(&self.lengths)); - inmp.bytes[piece_offset..(piece_offset + buf.len())].copy_from_slice(buf); + inmp.content[piece_offset..(piece_offset + buf.len())].copy_from_slice(buf); Ok(()) } @@ -108,14 +144,29 @@ impl TorrentStorage for InMemoryExampleStorage { lengths: self.lengths, map: RwLock::new(map), file_infos: self.file_infos.clone(), + max_ram_size_per_torrent: self.max_ram_size_per_torrent, })) } - fn init(&mut self, _meta: &crate::ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, _meta: &crate::ManagedTorrentShared) -> anyhow::Result<()> { Ok(()) } fn remove_directory_if_empty(&self, _path: &Path) -> anyhow::Result<()> { Ok(()) } + + fn on_piece_completed(&self, file_id: usize, offset: u64) -> anyhow::Result<()> { + let fi = &self.file_infos[file_id]; + let abs_offset = fi.offset_in_torrent + offset; + let piece_id: u32 = (abs_offset / self.lengths.default_piece_length() as u64).try_into()?; + let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?; + + let mut g = self.map.write(); + let inmp = g.get_mut(&piece_id).context("piece does not exist")?; + + inmp.has_been_validated = true; + + Ok(()) + } } diff --git a/crates/librqbit/src/storage/examples/mmap.rs b/crates/librqbit/src/storage/examples/mmap.rs index e448417a..8c07f5da 100644 --- a/crates/librqbit/src/storage/examples/mmap.rs +++ b/crates/librqbit/src/storage/examples/mmap.rs @@ -4,7 +4,7 @@ use parking_lot::RwLock; use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, - FileInfos, ManagedTorrentInfo, + FileInfos, ManagedTorrentShared, }; #[derive(Default, Clone)] @@ -18,7 +18,7 @@ pub struct MmapStorage { impl StorageFactory for MmapStorageFactory { type Storage = MmapStorage; - fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result { Ok(MmapStorage { mmap: RwLock::new( MmapOptions::new() @@ -63,11 +63,15 @@ impl TorrentStorage for MmapStorage { anyhow::bail!("not implemented") } - fn init(&mut self, _meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + fn init(&mut self, _meta: &ManagedTorrentShared) -> anyhow::Result<()> { Ok(()) } fn remove_directory_if_empty(&self, _path: &std::path::Path) -> anyhow::Result<()> { Ok(()) } + + fn on_piece_completed(&self, _file_id: usize, _offset: u64) -> anyhow::Result<()> { + Ok(()) + } } diff --git a/crates/librqbit/src/storage/filesystem/fs.rs b/crates/librqbit/src/storage/filesystem/fs.rs index 74ac5961..ff062e84 100644 --- a/crates/librqbit/src/storage/filesystem/fs.rs +++ b/crates/librqbit/src/storage/filesystem/fs.rs @@ -188,4 +188,9 @@ impl TorrentStorage for FilesystemStorage { self.opened_files = files; Ok(()) } + + fn on_piece_completed(&self, _file_id: usize, _offset: u64) -> anyhow::Result<()> { + // No job has to be done for this storage + Ok(()) + } } diff --git a/crates/librqbit/src/storage/filesystem/mmap.rs b/crates/librqbit/src/storage/filesystem/mmap.rs index 5b92657a..e001dfe2 100644 --- a/crates/librqbit/src/storage/filesystem/mmap.rs +++ b/crates/librqbit/src/storage/filesystem/mmap.rs @@ -112,4 +112,9 @@ impl TorrentStorage for MmapFilesystemStorage { self.opened_mmaps = mmaps; Ok(()) } + + fn on_piece_completed(&self, _file_id: usize, _offset: u64) -> anyhow::Result<()> { + // No job has to be done for this storage + Ok(()) + } } diff --git a/crates/librqbit/src/storage/mod.rs b/crates/librqbit/src/storage/mod.rs index ff710d03..0d53c340 100644 --- a/crates/librqbit/src/storage/mod.rs +++ b/crates/librqbit/src/storage/mod.rs @@ -97,6 +97,9 @@ pub trait TorrentStorage: Send + Sync { /// Replace the current storage with a dummy, and return a new one that should be used instead. /// This is used to make the underlying object useless when e.g. pausing the torrent. fn take(&self) -> anyhow::Result>; + + /// Callback called every time a piece has completed and has been validated. + fn on_piece_completed(&self, file_id: usize, offset: u64) -> anyhow::Result<()>; } impl TorrentStorage for Box { @@ -127,4 +130,8 @@ impl TorrentStorage for Box { fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { (**self).init(meta) } + + fn on_piece_completed(&self, file_id: usize, offset: u64) -> anyhow::Result<()> { + (**self).on_piece_completed(file_id, offset) + } } diff --git a/crates/librqbit/src/tests/test_util.rs b/crates/librqbit/src/tests/test_util.rs index 35a3ae63..f33baa2c 100644 --- a/crates/librqbit/src/tests/test_util.rs +++ b/crates/librqbit/src/tests/test_util.rs @@ -5,7 +5,7 @@ use std::{ time::Duration, }; -use anyhow::{bail, Context}; +use anyhow::bail; use librqbit_core::Id20; use parking_lot::RwLock; use rand::{thread_rng, Rng, RngCore, SeedableRng}; @@ -97,6 +97,7 @@ impl TestPeerMetadata { #[cfg(feature = "http-api")] async fn debug_server() -> anyhow::Result<()> { + use anyhow::Context; use axum::{response::IntoResponse, routing::get, Router}; async fn backtraces() -> impl IntoResponse { #[cfg(feature = "async-bt")] diff --git a/crates/peer_binary_protocol/src/extended/handshake.rs b/crates/peer_binary_protocol/src/extended/handshake.rs index 3b1a3d5e..7cc05dfc 100644 --- a/crates/peer_binary_protocol/src/extended/handshake.rs +++ b/crates/peer_binary_protocol/src/extended/handshake.rs @@ -99,7 +99,7 @@ impl Serialize for YourIP { IpAddr::V6(ipv6) => { let buf = ipv6.octets(); serializer.serialize_bytes(&buf) - }, + } } } } From 9183df0ebd14b3026028cda077b0757128facaed Mon Sep 17 00:00:00 2001 From: LIAUD Corentin Date: Wed, 28 Aug 2024 08:26:06 +0200 Subject: [PATCH 2/2] fix: fixes according to PR comments - Reset previous implementation of InMemoryExampleStorage - Implement default (empty) behaviour of on_piece_completed in trait itself - Now passing a ValidPieceIndex in on_piece_completed --- crates/librqbit/examples/custom_storage.rs | 4 -- crates/librqbit/src/file_ops.rs | 2 +- .../librqbit/src/storage/examples/inmemory.rs | 71 +++---------------- crates/librqbit/src/storage/filesystem/fs.rs | 5 -- .../librqbit/src/storage/filesystem/mmap.rs | 5 -- crates/librqbit/src/storage/mod.rs | 11 ++- 6 files changed, 19 insertions(+), 79 deletions(-) diff --git a/crates/librqbit/examples/custom_storage.rs b/crates/librqbit/examples/custom_storage.rs index 7ba7828c..f22daa88 100644 --- a/crates/librqbit/examples/custom_storage.rs +++ b/crates/librqbit/examples/custom_storage.rs @@ -57,10 +57,6 @@ impl TorrentStorage for CustomStorage { fn init(&mut self, _meta: &librqbit::ManagedTorrentShared) -> anyhow::Result<()> { anyhow::bail!("not implemented") } - - fn on_piece_completed(&self, _file_id: usize, _offset: u64) -> anyhow::Result<()> { - anyhow::bail!("not implemented") - } } #[tokio::main] diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index 45b4c935..27052f96 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -215,7 +215,7 @@ impl<'a> FileOps<'a> { format!("error reading {to_read_in_file} bytes, file_id: {file_idx} (\"{name:?}\")") })?; - self.files.on_piece_completed(file_idx, absolute_offset)?; + self.files.on_piece_completed(piece_index)?; piece_remaining_bytes -= to_read_in_file; diff --git a/crates/librqbit/src/storage/examples/inmemory.rs b/crates/librqbit/src/storage/examples/inmemory.rs index 41089504..5ba70206 100644 --- a/crates/librqbit/src/storage/examples/inmemory.rs +++ b/crates/librqbit/src/storage/examples/inmemory.rs @@ -8,37 +8,19 @@ use crate::type_aliases::FileInfos; use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage}; -pub struct InMemoryPiece { - pub content: Box<[u8]>, - pub has_been_validated: bool, +struct InMemoryPiece { + bytes: Box<[u8]>, } impl InMemoryPiece { - pub fn new(l: &Lengths) -> Self { + fn new(l: &Lengths) -> Self { let v = vec![0; l.default_piece_length() as usize].into_boxed_slice(); - Self { - content: v, - has_been_validated: false, - } - } - - pub fn can_be_discard(&self, upper_bound_offset: usize) -> bool { - self.has_been_validated && upper_bound_offset >= self.content.len() + Self { bytes: v } } } #[derive(Default, Clone)] -pub struct InMemoryExampleStorageFactory { - max_ram_size_per_torrent: usize, -} - -impl InMemoryExampleStorageFactory { - pub fn new(max_ram_size_per_torrent: usize) -> Self { - Self { - max_ram_size_per_torrent, - } - } -} +pub struct InMemoryExampleStorageFactory {} impl StorageFactory for InMemoryExampleStorageFactory { type Storage = InMemoryExampleStorage; @@ -47,11 +29,7 @@ impl StorageFactory for InMemoryExampleStorageFactory { &self, info: &crate::torrent_state::ManagedTorrentShared, ) -> anyhow::Result { - InMemoryExampleStorage::new( - info.lengths, - info.file_infos.clone(), - self.max_ram_size_per_torrent, - ) + InMemoryExampleStorage::new(info.lengths, info.file_infos.clone()) } fn clone_box(&self) -> crate::storage::BoxStorageFactory { @@ -63,15 +41,10 @@ pub struct InMemoryExampleStorage { lengths: Lengths, file_infos: FileInfos, map: RwLock>, - max_ram_size_per_torrent: usize, } impl InMemoryExampleStorage { - fn new( - lengths: Lengths, - file_infos: FileInfos, - max_ram_size_per_torrent: usize, - ) -> anyhow::Result { + fn new(lengths: Lengths, file_infos: FileInfos) -> anyhow::Result { // Max memory 128MiB. Make it tunable let max_pieces = 128 * 1024 * 1024 / lengths.default_piece_length(); if max_pieces == 0 { @@ -82,7 +55,6 @@ impl InMemoryExampleStorage { lengths, file_infos, map: RwLock::new(HashMap::new()), - max_ram_size_per_torrent, }) } } @@ -96,16 +68,9 @@ impl TorrentStorage for InMemoryExampleStorage { (abs_offset % self.lengths.default_piece_length() as u64).try_into()?; let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?; - let mut g = self.map.write(); - // Get and remove this data from buffer to free space + let g = self.map.read(); let inmp = g.get(&piece_id).context("piece expired")?; - let upper_bound_offset = piece_offset + buf.len(); - buf.copy_from_slice(&inmp.content[piece_offset..upper_bound_offset]); - - if inmp.can_be_discard(upper_bound_offset) { - let _ = g.remove(&piece_id); - } - + buf.copy_from_slice(&inmp.bytes[piece_offset..(piece_offset + buf.len())]); Ok(()) } @@ -116,12 +81,11 @@ impl TorrentStorage for InMemoryExampleStorage { let piece_offset: usize = (abs_offset % self.lengths.default_piece_length() as u64).try_into()?; let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?; - let mut g = self.map.write(); let inmp = g .entry(piece_id) .or_insert_with(|| InMemoryPiece::new(&self.lengths)); - inmp.content[piece_offset..(piece_offset + buf.len())].copy_from_slice(buf); + inmp.bytes[piece_offset..(piece_offset + buf.len())].copy_from_slice(buf); Ok(()) } @@ -144,7 +108,6 @@ impl TorrentStorage for InMemoryExampleStorage { lengths: self.lengths, map: RwLock::new(map), file_infos: self.file_infos.clone(), - max_ram_size_per_torrent: self.max_ram_size_per_torrent, })) } @@ -155,18 +118,4 @@ impl TorrentStorage for InMemoryExampleStorage { fn remove_directory_if_empty(&self, _path: &Path) -> anyhow::Result<()> { Ok(()) } - - fn on_piece_completed(&self, file_id: usize, offset: u64) -> anyhow::Result<()> { - let fi = &self.file_infos[file_id]; - let abs_offset = fi.offset_in_torrent + offset; - let piece_id: u32 = (abs_offset / self.lengths.default_piece_length() as u64).try_into()?; - let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?; - - let mut g = self.map.write(); - let inmp = g.get_mut(&piece_id).context("piece does not exist")?; - - inmp.has_been_validated = true; - - Ok(()) - } } diff --git a/crates/librqbit/src/storage/filesystem/fs.rs b/crates/librqbit/src/storage/filesystem/fs.rs index ff062e84..74ac5961 100644 --- a/crates/librqbit/src/storage/filesystem/fs.rs +++ b/crates/librqbit/src/storage/filesystem/fs.rs @@ -188,9 +188,4 @@ impl TorrentStorage for FilesystemStorage { self.opened_files = files; Ok(()) } - - fn on_piece_completed(&self, _file_id: usize, _offset: u64) -> anyhow::Result<()> { - // No job has to be done for this storage - Ok(()) - } } diff --git a/crates/librqbit/src/storage/filesystem/mmap.rs b/crates/librqbit/src/storage/filesystem/mmap.rs index e001dfe2..5b92657a 100644 --- a/crates/librqbit/src/storage/filesystem/mmap.rs +++ b/crates/librqbit/src/storage/filesystem/mmap.rs @@ -112,9 +112,4 @@ impl TorrentStorage for MmapFilesystemStorage { self.opened_mmaps = mmaps; Ok(()) } - - fn on_piece_completed(&self, _file_id: usize, _offset: u64) -> anyhow::Result<()> { - // No job has to be done for this storage - Ok(()) - } } diff --git a/crates/librqbit/src/storage/mod.rs b/crates/librqbit/src/storage/mod.rs index 0d53c340..efefa6e4 100644 --- a/crates/librqbit/src/storage/mod.rs +++ b/crates/librqbit/src/storage/mod.rs @@ -11,6 +11,8 @@ use std::{ path::Path, }; +use librqbit_core::lengths::ValidPieceIndex; + use crate::torrent_state::ManagedTorrentShared; pub trait StorageFactory: Send + Sync + Any { @@ -99,7 +101,10 @@ pub trait TorrentStorage: Send + Sync { fn take(&self) -> anyhow::Result>; /// Callback called every time a piece has completed and has been validated. - fn on_piece_completed(&self, file_id: usize, offset: u64) -> anyhow::Result<()>; + /// Default implementation does nothing, but can be override in trait implementations. + fn on_piece_completed(&self, _piece_index: ValidPieceIndex) -> anyhow::Result<()> { + Ok(()) + } } impl TorrentStorage for Box { @@ -131,7 +136,7 @@ impl TorrentStorage for Box { (**self).init(meta) } - fn on_piece_completed(&self, file_id: usize, offset: u64) -> anyhow::Result<()> { - (**self).on_piece_completed(file_id, offset) + fn on_piece_completed(&self, piece_id: ValidPieceIndex) -> anyhow::Result<()> { + (**self).on_piece_completed(piece_id) } }