Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Allow TorrentStorage to remove pieces while downloading #224

Closed
cocool97 opened this issue Aug 28, 2024 · 1 comment
Closed

[Feature] Allow TorrentStorage to remove pieces while downloading #224

cocool97 opened this issue Aug 28, 2024 · 1 comment

Comments

@cocool97
Copy link

Work has begun in #219 to be able to remove pieces from TorrentStorage when they have been read.
But there's still an issue when removing them as they keep being downloaded over and over.

I think it would be nice to add a new method in TorrentStorage, piece_has_been_downloaded(piece_id) to check if a piece has already been downloaded in the session context, and optionality add another method reset_pieces_state(from_piece_id) that would be called when seeking from torrent to reset all pieces after this piece_id and allowing the Session to re-download these pieces (as we obviously have to download these pieces again).

What do you think about it ?

I provide you a custom storage implementation here that remove pieces when read :

use std::{collections::HashMap, path::Path};

use anyhow::Context;
use librqbit::{storage::TorrentStorage, FileInfos, ManagedTorrentShared};
use librqbit_core::lengths::{Lengths, ValidPieceIndex};
use parking_lot::RwLock;

use crate::in_memory_piece::InMemoryPiece;

pub struct InMemoryPiece {
    pub content: Box<[u8]>,
    pub has_been_validated: bool,
}

impl InMemoryPiece {
    pub fn new(l: &Lengths) -> Self {
        let v = vec![0; l.default_piece_length() as usize].into_boxed_slice();
        Self {
            content: v,
            has_been_validated: false,
        }
    }

    pub fn can_be_discard(&self, upper_bound_offset: usize) -> bool {
        self.has_been_validated && upper_bound_offset >= self.content.len()
    }
}

pub struct InMemoryStorage {
    lengths: Lengths,
    file_infos: FileInfos,
    map: RwLock<HashMap<ValidPieceIndex, InMemoryPiece>>,
    max_ram_size_per_torrent: usize,
}

impl InMemoryStorage {
    pub fn new(
        lengths: Lengths,
        file_infos: FileInfos,
        max_ram_size_per_torrent: usize,
    ) -> anyhow::Result<Self> {
        // Max memory 128MiB. Make it tunable
        let max_pieces = 128 * 1024 * 1024 / lengths.default_piece_length();
        if max_pieces == 0 {
            anyhow::bail!("pieces too large");
        }

        Ok(Self {
            lengths,
            file_infos,
            map: RwLock::new(HashMap::new()),
            max_ram_size_per_torrent,
        })
    }
}

impl TorrentStorage for InMemoryStorage {
    fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> {
        // log::debug!("pread_exact {file_id} {offset}");
        let fi = &self.file_infos[file_id];
        let abs_offset = fi.offset_in_torrent + offset;
        let piece_id: u32 = (abs_offset / self.lengths.default_piece_length() as u64).try_into()?;
        let piece_offset: usize =
            (abs_offset % self.lengths.default_piece_length() as u64).try_into()?;
        let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?;

        log::debug!("[READ] piece_id={piece_id}; piece_offset={piece_offset}");

        let mut g = self.map.write();
        // Get and remove this data from buffer to free space
        let inmp = g.get(&piece_id).context("piece expired")?;
        let upper_bound_offset = piece_offset + buf.len();
        buf.copy_from_slice(&inmp.content[piece_offset..upper_bound_offset]);

        if inmp.can_be_discard(upper_bound_offset) {
            log::info!("Can discard {piece_id}...");
            let _ = g.remove(&piece_id);
        }

        Ok(())
    }

    fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> {
        // log::debug!("pwrite_all {file_id} {offset}");
        let fi = &self.file_infos[file_id];
        let abs_offset = fi.offset_in_torrent + offset;
        let piece_id: u32 = (abs_offset / self.lengths.default_piece_length() as u64).try_into()?;
        let piece_offset: usize =
            (abs_offset % self.lengths.default_piece_length() as u64).try_into()?;
        let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?;

        log::debug!("[WRITE] piece_id={piece_id}; piece_offset={piece_offset}");

        let mut g = self.map.write();
        let inmp = g
            .entry(piece_id)
            .or_insert_with(|| InMemoryPiece::new(&self.lengths));
        inmp.content[piece_offset..(piece_offset + buf.len())].copy_from_slice(buf);
        Ok(())
    }

    fn remove_file(&self, _file_id: usize, _filename: &Path) -> anyhow::Result<()> {
        // log::debug!("remove_file {file_id} {filename:?}");
        Ok(())
    }

    fn ensure_file_length(&self, _file_id: usize, _length: u64) -> anyhow::Result<()> {
        // log::debug!("ensure {file_id} {length}");
        Ok(())
    }

    fn take(&self) -> anyhow::Result<Box<dyn TorrentStorage>> {
        let map = {
            let mut g = self.map.write();
            let mut repl = HashMap::new();
            std::mem::swap(&mut *g, &mut repl);
            repl
        };
        Ok(Box::new(Self {
            lengths: self.lengths,
            map: RwLock::new(map),
            file_infos: self.file_infos.clone(),
            max_ram_size_per_torrent: self.max_ram_size_per_torrent,
        }))
    }

    fn init(&mut self, _meta: &ManagedTorrentShared) -> anyhow::Result<()> {
        // log::debug!("init {:?}", meta.file_infos);
        Ok(())
    }

    fn remove_directory_if_empty(&self, _path: &Path) -> anyhow::Result<()> {
        // log::debug!("remove dir {path:?}");
        Ok(())
    }

    fn on_piece_completed(&self, piece_id: ValidPieceIndex) -> anyhow::Result<()> {
        let mut g = self.map.write();
        let inmp = g.get_mut(&piece_id).context("piece does not exist")?;

        inmp.has_been_validated = true;

        Ok(())
    }
}
@ikatson
Copy link
Owner

ikatson commented Aug 28, 2024

I don't like that idea, it's too hacky and too specific for the hacky use-case, doesn't generalize.

To support streaming and deleting files you need at least:

  • the stream to control when the pieces are deleted (not torrent storage). I.e. when the stream returned it, and there's no other streams that need that piece, delete it
  • the torrent storage to be able to actually delete the pieces, which implies access to live torrent state (you need to set the values in chunk tracker). Having "on_piece_completed" isn't enough, you can't delete it as there's no guarantee the stream won't need it
  • the "reserve_next_needed_piece" to do nothing (it does it already if "selected_files" is an empty list)

So I don't think torrent storage in its current form would fit that, and also your changes aren't enough and I don't think they are the right way to do it.

If you want to do that I want to see that in one PR stack working together with e.g. an example memory storage. It doesn't sound hard.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants