Skip to content

Commit

Permalink
Clear have_bitfield on error
Browse files Browse the repository at this point in the history
  • Loading branch information
ikatson committed Aug 21, 2024
1 parent b7ed850 commit c697809
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 32 deletions.
5 changes: 5 additions & 0 deletions crates/librqbit/src/bitv_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{api::TorrentIdOrHash, bitv::BitV, type_aliases::BF};
#[async_trait::async_trait]
pub trait BitVFactory: Send + Sync {
async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result<Option<Box<dyn BitV>>>;
async fn clear(&self, id: TorrentIdOrHash) -> anyhow::Result<()>;
async fn store_initial_check(
&self,
id: TorrentIdOrHash,
Expand All @@ -18,6 +19,10 @@ impl BitVFactory for NonPersistentBitVFactory {
Ok(None)
}

async fn clear(&self, _id: TorrentIdOrHash) -> anyhow::Result<()> {
Ok(())
}

async fn store_initial_check(
&self,
_id: TorrentIdOrHash,
Expand Down
1 change: 1 addition & 0 deletions crates/librqbit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,7 @@ impl Session {
minfo.clone(),
only_files.clone(),
minfo.storage_factory.create_and_init(&minfo)?,
false,
));
let handle = Arc::new(ManagedTorrent {
locked: RwLock::new(ManagedTorrentLocked {
Expand Down
8 changes: 8 additions & 0 deletions crates/librqbit/src/session_persistence/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ impl BitVFactory for JsonSessionPersistenceStore {
Ok(Some(MmapBitV::new(f)?.into_dyn()))
}

async fn clear(&self, id: TorrentIdOrHash) -> anyhow::Result<()> {
let h = self.to_hash(id).await?;
let filename = self.bitv_filename(&h);
tokio::fs::remove_file(&filename)
.await
.with_context(|| format!("error removing {filename:?}"))
}

async fn store_initial_check(
&self,
id: TorrentIdOrHash,
Expand Down
27 changes: 27 additions & 0 deletions crates/librqbit/src/session_persistence/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,4 +313,31 @@ impl BitVFactory for PostgresSessionStorage {
bf.flush()?;
Ok(bf.into_dyn())
}

async fn clear(&self, id: TorrentIdOrHash) -> anyhow::Result<()> {
macro_rules! exec {
($q:expr, $v:expr) => {
sqlx::query($q)
.bind($v)
.execute(&self.pool)
.await
.context($q)
.context("error executing query")?
};
}

match id {
TorrentIdOrHash::Id(id) => {
let id: i32 = id.try_into()?;
exec!("UPDATE torrents SET have_bitfield = NULL WHERE id = $1", id);
}
TorrentIdOrHash::Hash(h) => {
exec!(
"UPDATE torrents SET have_bitfield = NULL WHERE info_hash = $1",
&h.0[..]
);
}
}
Ok(())
}
}
62 changes: 38 additions & 24 deletions crates/librqbit/src/torrent_state/initializing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use tracing::{debug, info, warn};
use crate::{
api::TorrentIdOrHash,
bitv::BitV,
bitv_factory::BitVFactory,
chunk_tracker::ChunkTracker,
file_ops::FileOps,
type_aliases::{FileStorage, BF},
Expand All @@ -23,9 +22,10 @@ use super::{paused::TorrentStatePaused, ManagedTorrentShared};

pub struct TorrentStateInitializing {
pub(crate) files: FileStorage,
pub(crate) meta: Arc<ManagedTorrentShared>,
pub(crate) shared: Arc<ManagedTorrentShared>,
pub(crate) only_files: Option<Vec<usize>>,
pub(crate) checked_bytes: AtomicU64,
previously_errored: bool,
}

fn compute_selected_pieces(
Expand All @@ -51,12 +51,14 @@ impl TorrentStateInitializing {
meta: Arc<ManagedTorrentShared>,
only_files: Option<Vec<usize>>,
files: FileStorage,
previously_errored: bool,
) -> Self {
Self {
meta,
shared: meta,
only_files,
files,
checked_bytes: AtomicU64::new(0),
previously_errored,
}
}

Expand All @@ -65,18 +67,30 @@ impl TorrentStateInitializing {
.load(std::sync::atomic::Ordering::Relaxed)
}

pub async fn check(
&self,
bitv_factory: Arc<dyn BitVFactory>,
) -> anyhow::Result<TorrentStatePaused> {
let id: TorrentIdOrHash = self.meta.info_hash.into();
let mut have_pieces = bitv_factory
.load(id)
.await
.context("error loading have_pieces")?;
pub async fn check(&self) -> anyhow::Result<TorrentStatePaused> {
let id: TorrentIdOrHash = self.shared.info_hash.into();
let bitv_factory = self
.shared
.session
.upgrade()
.context("session is dead")?
.bitv_factory
.clone();
let mut have_pieces = if self.previously_errored {
if let Err(e) = bitv_factory.clear(id).await {
warn!(error=?e, "error clearing bitfield");
}
None
} else {
bitv_factory
.load(id)
.await
.context("error loading have_pieces")?
};

if let Some(hp) = have_pieces.as_ref() {
let actual = hp.as_bytes().len();
let expected = self.meta.lengths.piece_bitfield_bytes();
let expected = self.shared.lengths.piece_bitfield_bytes();
if actual != expected {
warn!(
actual,
Expand All @@ -90,12 +104,12 @@ impl TorrentStateInitializing {
Some(h) => h,
None => {
info!("Doing initial checksum validation, this might take a while...");
let have_pieces = self.meta.spawner.spawn_block_in_place(|| {
let have_pieces = self.shared.spawner.spawn_block_in_place(|| {
FileOps::new(
&self.meta.info,
&self.shared.info,
&self.files,
&self.meta.file_infos,
&self.meta.lengths,
&self.shared.file_infos,
&self.shared.lengths,
)
.initial_check(&self.checked_bytes)
})?;
Expand All @@ -107,16 +121,16 @@ impl TorrentStateInitializing {
};

let selected_pieces = compute_selected_pieces(
&self.meta.lengths,
&self.shared.lengths,
self.only_files.as_deref(),
&self.meta.file_infos,
&self.shared.file_infos,
);

let chunk_tracker = ChunkTracker::new(
have_pieces.into_dyn(),
selected_pieces,
self.meta.lengths,
&self.meta.file_infos,
self.shared.lengths,
&self.shared.file_infos,
)
.context("error creating chunk tracker")?;

Expand All @@ -130,8 +144,8 @@ impl TorrentStateInitializing {
);

// Ensure file lenghts are correct, and reopen read-only.
self.meta.spawner.spawn_block_in_place(|| {
for (idx, fi) in self.meta.file_infos.iter().enumerate() {
self.shared.spawner.spawn_block_in_place(|| {
for (idx, fi) in self.shared.file_infos.iter().enumerate() {
if self
.only_files
.as_ref()
Expand All @@ -158,7 +172,7 @@ impl TorrentStateInitializing {
})?;

let paused = TorrentStatePaused {
info: self.meta.clone(),
shared: self.shared.clone(),
files: self.files.take()?,
chunk_tracker,
streams: Arc::new(Default::default()),
Expand Down
10 changes: 5 additions & 5 deletions crates/librqbit/src/torrent_state/live/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl TorrentStateLive {
) -> anyhow::Result<Arc<Self>> {
let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
let session = paused
.info
.shared
.session
.upgrade()
.context("session is dead, cannot start torrent")?;
Expand All @@ -230,11 +230,11 @@ impl TorrentStateLive {

// TODO: make it configurable
let file_priorities = {
let mut pri = (0..paused.info.file_infos.len()).collect::<Vec<usize>>();
let mut pri = (0..paused.shared.file_infos.len()).collect::<Vec<usize>>();
// sort by filename, cause many torrents have random sort order.
pri.sort_unstable_by_key(|id| {
paused
.info
.shared
.file_infos
.get(*id)
.map(|fi| fi.relative_filename.as_path())
Expand All @@ -245,7 +245,7 @@ impl TorrentStateLive {
let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128);

let state = Arc::new(TorrentStateLive {
torrent: paused.info.clone(),
torrent: paused.shared.clone(),
peers: PeerStates {
session_stats: session_stats.clone(),
stats: Default::default(),
Expand Down Expand Up @@ -668,7 +668,7 @@ impl TorrentStateLive {

// g.chunks;
Ok(TorrentStatePaused {
info: self.torrent.clone(),
shared: self.torrent.clone(),
files: self.files.take()?,
chunk_tracker,
streams: self.streams.clone(),
Expand Down
3 changes: 2 additions & 1 deletion crates/librqbit/src/torrent_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl ManagedTorrent {
.await
.context("bug: concurrent init semaphore was closed")?;

match init.check(session.bitv_factory.clone()).await {
match init.check().await {
Ok(paused) => {
let mut g = t.locked.write();
if let ManagedTorrentState::Initializing(_) = &g.state {
Expand Down Expand Up @@ -365,6 +365,7 @@ impl ManagedTorrent {
self.shared.clone(),
g.only_files.clone(),
self.shared.storage_factory.create_and_init(self.shared())?,
true,
));
g.state = ManagedTorrentState::Initializing(initializing.clone());
drop(g);
Expand Down
4 changes: 2 additions & 2 deletions crates/librqbit/src/torrent_state/paused.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
use super::{streaming::TorrentStreams, ManagedTorrentShared};

pub struct TorrentStatePaused {
pub(crate) info: Arc<ManagedTorrentShared>,
pub(crate) shared: Arc<ManagedTorrentShared>,
pub(crate) files: FileStorage,
pub(crate) chunk_tracker: ChunkTracker,
pub(crate) streams: Arc<TorrentStreams>,
Expand All @@ -17,7 +17,7 @@ pub struct TorrentStatePaused {
impl TorrentStatePaused {
pub(crate) fn update_only_files(&mut self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
self.chunk_tracker
.update_only_files(self.info.info.iter_file_lengths()?, only_files)?;
.update_only_files(self.shared.info.iter_file_lengths()?, only_files)?;
Ok(())
}

Expand Down

0 comments on commit c697809

Please sign in to comment.