From c697809e50caf2c6b31f43e25ab94075a8e44e25 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 21 Aug 2024 18:21:15 +0100 Subject: [PATCH] Clear have_bitfield on error --- crates/librqbit/src/bitv_factory.rs | 5 ++ crates/librqbit/src/session.rs | 1 + .../librqbit/src/session_persistence/json.rs | 8 +++ .../src/session_persistence/postgres.rs | 27 ++++++++ .../src/torrent_state/initializing.rs | 62 ++++++++++++------- crates/librqbit/src/torrent_state/live/mod.rs | 10 +-- crates/librqbit/src/torrent_state/mod.rs | 3 +- crates/librqbit/src/torrent_state/paused.rs | 4 +- 8 files changed, 88 insertions(+), 32 deletions(-) diff --git a/crates/librqbit/src/bitv_factory.rs b/crates/librqbit/src/bitv_factory.rs index 57f07917..21b8724e 100644 --- a/crates/librqbit/src/bitv_factory.rs +++ b/crates/librqbit/src/bitv_factory.rs @@ -3,6 +3,7 @@ use crate::{api::TorrentIdOrHash, bitv::BitV, type_aliases::BF}; #[async_trait::async_trait] pub trait BitVFactory: Send + Sync { async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result>>; + async fn clear(&self, id: TorrentIdOrHash) -> anyhow::Result<()>; async fn store_initial_check( &self, id: TorrentIdOrHash, @@ -18,6 +19,10 @@ impl BitVFactory for NonPersistentBitVFactory { Ok(None) } + async fn clear(&self, _id: TorrentIdOrHash) -> anyhow::Result<()> { + Ok(()) + } + async fn store_initial_check( &self, _id: TorrentIdOrHash, diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 096e0864..f6b98efa 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1148,6 +1148,7 @@ impl Session { minfo.clone(), only_files.clone(), minfo.storage_factory.create_and_init(&minfo)?, + false, )); let handle = Arc::new(ManagedTorrent { locked: RwLock::new(ManagedTorrentLocked { diff --git a/crates/librqbit/src/session_persistence/json.rs b/crates/librqbit/src/session_persistence/json.rs index bd2d8885..1d262a56 100644 --- a/crates/librqbit/src/session_persistence/json.rs +++ b/crates/librqbit/src/session_persistence/json.rs @@ -196,6 +196,14 @@ impl BitVFactory for JsonSessionPersistenceStore { Ok(Some(MmapBitV::new(f)?.into_dyn())) } + async fn clear(&self, id: TorrentIdOrHash) -> anyhow::Result<()> { + let h = self.to_hash(id).await?; + let filename = self.bitv_filename(&h); + tokio::fs::remove_file(&filename) + .await + .with_context(|| format!("error removing {filename:?}")) + } + async fn store_initial_check( &self, id: TorrentIdOrHash, diff --git a/crates/librqbit/src/session_persistence/postgres.rs b/crates/librqbit/src/session_persistence/postgres.rs index 10d593c6..62d31a1b 100644 --- a/crates/librqbit/src/session_persistence/postgres.rs +++ b/crates/librqbit/src/session_persistence/postgres.rs @@ -313,4 +313,31 @@ impl BitVFactory for PostgresSessionStorage { bf.flush()?; Ok(bf.into_dyn()) } + + async fn clear(&self, id: TorrentIdOrHash) -> anyhow::Result<()> { + macro_rules! exec { + ($q:expr, $v:expr) => { + sqlx::query($q) + .bind($v) + .execute(&self.pool) + .await + .context($q) + .context("error executing query")? + }; + } + + match id { + TorrentIdOrHash::Id(id) => { + let id: i32 = id.try_into()?; + exec!("UPDATE torrents SET have_bitfield = NULL WHERE id = $1", id); + } + TorrentIdOrHash::Hash(h) => { + exec!( + "UPDATE torrents SET have_bitfield = NULL WHERE info_hash = $1", + &h.0[..] + ); + } + } + Ok(()) + } } diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 915139b9..4745abb9 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -12,7 +12,6 @@ use tracing::{debug, info, warn}; use crate::{ api::TorrentIdOrHash, bitv::BitV, - bitv_factory::BitVFactory, chunk_tracker::ChunkTracker, file_ops::FileOps, type_aliases::{FileStorage, BF}, @@ -23,9 +22,10 @@ use super::{paused::TorrentStatePaused, ManagedTorrentShared}; pub struct TorrentStateInitializing { pub(crate) files: FileStorage, - pub(crate) meta: Arc, + pub(crate) shared: Arc, pub(crate) only_files: Option>, pub(crate) checked_bytes: AtomicU64, + previously_errored: bool, } fn compute_selected_pieces( @@ -51,12 +51,14 @@ impl TorrentStateInitializing { meta: Arc, only_files: Option>, files: FileStorage, + previously_errored: bool, ) -> Self { Self { - meta, + shared: meta, only_files, files, checked_bytes: AtomicU64::new(0), + previously_errored, } } @@ -65,18 +67,30 @@ impl TorrentStateInitializing { .load(std::sync::atomic::Ordering::Relaxed) } - pub async fn check( - &self, - bitv_factory: Arc, - ) -> anyhow::Result { - let id: TorrentIdOrHash = self.meta.info_hash.into(); - let mut have_pieces = bitv_factory - .load(id) - .await - .context("error loading have_pieces")?; + pub async fn check(&self) -> anyhow::Result { + let id: TorrentIdOrHash = self.shared.info_hash.into(); + let bitv_factory = self + .shared + .session + .upgrade() + .context("session is dead")? + .bitv_factory + .clone(); + let mut have_pieces = if self.previously_errored { + if let Err(e) = bitv_factory.clear(id).await { + warn!(error=?e, "error clearing bitfield"); + } + None + } else { + bitv_factory + .load(id) + .await + .context("error loading have_pieces")? + }; + if let Some(hp) = have_pieces.as_ref() { let actual = hp.as_bytes().len(); - let expected = self.meta.lengths.piece_bitfield_bytes(); + let expected = self.shared.lengths.piece_bitfield_bytes(); if actual != expected { warn!( actual, @@ -90,12 +104,12 @@ impl TorrentStateInitializing { Some(h) => h, None => { info!("Doing initial checksum validation, this might take a while..."); - let have_pieces = self.meta.spawner.spawn_block_in_place(|| { + let have_pieces = self.shared.spawner.spawn_block_in_place(|| { FileOps::new( - &self.meta.info, + &self.shared.info, &self.files, - &self.meta.file_infos, - &self.meta.lengths, + &self.shared.file_infos, + &self.shared.lengths, ) .initial_check(&self.checked_bytes) })?; @@ -107,16 +121,16 @@ impl TorrentStateInitializing { }; let selected_pieces = compute_selected_pieces( - &self.meta.lengths, + &self.shared.lengths, self.only_files.as_deref(), - &self.meta.file_infos, + &self.shared.file_infos, ); let chunk_tracker = ChunkTracker::new( have_pieces.into_dyn(), selected_pieces, - self.meta.lengths, - &self.meta.file_infos, + self.shared.lengths, + &self.shared.file_infos, ) .context("error creating chunk tracker")?; @@ -130,8 +144,8 @@ impl TorrentStateInitializing { ); // Ensure file lenghts are correct, and reopen read-only. - self.meta.spawner.spawn_block_in_place(|| { - for (idx, fi) in self.meta.file_infos.iter().enumerate() { + self.shared.spawner.spawn_block_in_place(|| { + for (idx, fi) in self.shared.file_infos.iter().enumerate() { if self .only_files .as_ref() @@ -158,7 +172,7 @@ impl TorrentStateInitializing { })?; let paused = TorrentStatePaused { - info: self.meta.clone(), + shared: self.shared.clone(), files: self.files.take()?, chunk_tracker, streams: Arc::new(Default::default()), diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 0c583ccf..64314bba 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -217,7 +217,7 @@ impl TorrentStateLive { ) -> anyhow::Result> { let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); let session = paused - .info + .shared .session .upgrade() .context("session is dead, cannot start torrent")?; @@ -230,11 +230,11 @@ impl TorrentStateLive { // TODO: make it configurable let file_priorities = { - let mut pri = (0..paused.info.file_infos.len()).collect::>(); + let mut pri = (0..paused.shared.file_infos.len()).collect::>(); // sort by filename, cause many torrents have random sort order. pri.sort_unstable_by_key(|id| { paused - .info + .shared .file_infos .get(*id) .map(|fi| fi.relative_filename.as_path()) @@ -245,7 +245,7 @@ impl TorrentStateLive { let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128); let state = Arc::new(TorrentStateLive { - torrent: paused.info.clone(), + torrent: paused.shared.clone(), peers: PeerStates { session_stats: session_stats.clone(), stats: Default::default(), @@ -668,7 +668,7 @@ impl TorrentStateLive { // g.chunks; Ok(TorrentStatePaused { - info: self.torrent.clone(), + shared: self.torrent.clone(), files: self.files.take()?, chunk_tracker, streams: self.streams.clone(), diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 0feaaf1f..54b6d044 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -311,7 +311,7 @@ impl ManagedTorrent { .await .context("bug: concurrent init semaphore was closed")?; - match init.check(session.bitv_factory.clone()).await { + match init.check().await { Ok(paused) => { let mut g = t.locked.write(); if let ManagedTorrentState::Initializing(_) = &g.state { @@ -365,6 +365,7 @@ impl ManagedTorrent { self.shared.clone(), g.only_files.clone(), self.shared.storage_factory.create_and_init(self.shared())?, + true, )); g.state = ManagedTorrentState::Initializing(initializing.clone()); drop(g); diff --git a/crates/librqbit/src/torrent_state/paused.rs b/crates/librqbit/src/torrent_state/paused.rs index b92304c6..9c78a267 100644 --- a/crates/librqbit/src/torrent_state/paused.rs +++ b/crates/librqbit/src/torrent_state/paused.rs @@ -8,7 +8,7 @@ use crate::{ use super::{streaming::TorrentStreams, ManagedTorrentShared}; pub struct TorrentStatePaused { - pub(crate) info: Arc, + pub(crate) shared: Arc, pub(crate) files: FileStorage, pub(crate) chunk_tracker: ChunkTracker, pub(crate) streams: Arc, @@ -17,7 +17,7 @@ pub struct TorrentStatePaused { impl TorrentStatePaused { pub(crate) fn update_only_files(&mut self, only_files: &HashSet) -> anyhow::Result<()> { self.chunk_tracker - .update_only_files(self.info.info.iter_file_lengths()?, only_files)?; + .update_only_files(self.shared.info.iter_file_lengths()?, only_files)?; Ok(()) }