diff --git a/.gitignore b/.gitignore index ea8c4bf..0a25f36 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +/out diff --git a/src/client/file_manager.rs b/src/client/file_manager.rs new file mode 100644 index 0000000..950d43f --- /dev/null +++ b/src/client/file_manager.rs @@ -0,0 +1,63 @@ +use std::{ + fs::{create_dir_all, File, OpenOptions}, + os::unix::fs::FileExt, +}; + +use crate::metainfo::Info; + +#[derive(Debug)] +pub struct FileManager { + piece_length: u64, + files: Vec<(File, u64)>, +} + +impl FileManager { + pub fn new(output_dir: String, info_dict: &Info) -> Self { + create_dir_all(&output_dir).unwrap(); + match info_dict { + Info::SingleFile(info) => { + let file_path = format!("{}/{}", output_dir, info.name); + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(file_path) + .unwrap(); + FileManager { + piece_length: info.base_info.piece_length as u64, + files: vec![(file, info.length)], + } + } + Info::MultiFile(info) => { + let mut files = Vec::new(); + for file_info in &info.files { + let file_path = format!("{}/{}", output_dir, file_info.path.join("/")); + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(file_path) + .unwrap(); + files.push((file, file_info.length)); + } + FileManager { + piece_length: info.base_info.piece_length as u64, + files, + } + } + } + } + + pub fn save_block(&mut self, piece_index: usize, begin: u32, data: Vec) { + let byte_offset = self.piece_length * piece_index as u64 + begin as u64; + let mut accumulated_size = 0; + for (file, file_size) in &mut self.files { + if byte_offset < accumulated_size + *file_size { + file.write_at(&data, byte_offset - accumulated_size) + .unwrap(); + break; + } + accumulated_size += *file_size; + } + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index a79859c..cb7e938 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -16,6 +16,7 @@ use tokio::{ }; mod bitfield; +mod file_manager; mod message; mod pieces; @@ -139,8 +140,8 @@ pub struct Client { } impl Client { - pub fn new(tracker: Tracker) -> Self { - let piece_scheduler = PieceScheduler::new(&tracker.get_metainfo().info); + pub fn new(tracker: Tracker, output_dir: String) -> Self { + let piece_scheduler = PieceScheduler::new(&tracker.get_metainfo().info, output_dir); Self { tracker, peers: Arc::new(RwLock::new(HashMap::new())), @@ -153,7 +154,7 @@ impl Client { } pub async fn download(&mut self) -> Result<(), ClientError> { - self.connect_to_peers(10).await?; + self.connect_to_peers(30).await?; let _ = tokio::join!( self.send_messages(), @@ -176,8 +177,7 @@ impl Client { let start_time = self.start_time; tokio::spawn(async move { - loop { - // println!("Processing messages..."); + while *total_downloaded.lock().await < total_length { let Some((peer_id, message)) = receive_queue.lock().await.pop_front() else { continue; }; @@ -383,12 +383,13 @@ impl Client { let peers = Arc::clone(&self.peers); let receive_queue = Arc::clone(&self.receive_queue); let piece_scheduler = Arc::clone(&self.piece_scheduler); + let total_length = self.tracker.get_metainfo().get_length(); + let total_downloaded = Arc::clone(&self.total_downloaded); + tokio::spawn(async move { let mut peers_to_remove = Vec::new(); - loop { - // println!("Retrieving messages..."); + while *total_downloaded.lock().await < total_length { for (peer_id, peer) in peers.read().await.iter() { - // println!("pre-receive"); match receive_message(&peer.lock().await.stream).await { Ok(message) => { println!( @@ -402,7 +403,7 @@ impl Client { .push_back((peer_id.clone(), message)); } Err(ReceiveError::WouldBlock) => { - // continue; + continue; } Err(e) => { println!( @@ -413,7 +414,6 @@ impl Client { peers_to_remove.push(peer_id.clone()); } } - // println!("post-receive"); peer.lock().await.last_touch = Utc::now(); yield_now().await; } @@ -435,9 +435,11 @@ impl Client { let peers = Arc::clone(&self.peers); let send_queue = Arc::clone(&self.send_queue); let piece_scheduler = Arc::clone(&self.piece_scheduler); + let total_length = self.tracker.get_metainfo().get_length(); + let total_downloaded = Arc::clone(&self.total_downloaded); + tokio::spawn(async move { - loop { - // println!("Sending messages..."); + while *total_downloaded.lock().await < total_length { let Some((peer_id, message)) = send_queue.lock().await.pop_front() else { continue; }; diff --git a/src/client/pieces.rs b/src/client/pieces.rs index 33fd990..4146c56 100644 --- a/src/client/pieces.rs +++ b/src/client/pieces.rs @@ -2,7 +2,7 @@ use std::collections::HashSet; use crate::metainfo::Info; -use super::bitfield::Bitfield; +use super::{bitfield::Bitfield, file_manager::FileManager}; pub const BLOCK_SIZE: u32 = 2 << 13; // 16KB @@ -10,8 +10,8 @@ pub const BLOCK_SIZE: u32 = 2 << 13; // 16KB pub struct Block { begin: u32, length: u32, - data: Vec, requested: bool, + completed: bool, } #[derive(Debug)] @@ -26,12 +26,13 @@ pub struct Piece { #[derive(Debug)] pub struct PieceScheduler { pieces: Vec, + file_manager: FileManager, any_complete: bool, } impl PieceScheduler { - pub fn new(info_dict: &Info) -> Self { - let (piece_hashes, piece_length, file_size) = match info_dict { + pub fn new(info_dict: &Info, output_dir: String) -> Self { + let (piece_hashes, piece_length, total_size) = match info_dict { Info::SingleFile(info) => ( info.base_info.pieces.clone(), info.base_info.piece_length, @@ -49,7 +50,7 @@ impl PieceScheduler { "piece length must be a multiple of the block size" ); - let mut remaining_size = file_size as u32; + let mut remaining_size = total_size as u32; let mut pieces = Vec::new(); for (i, hash) in piece_hashes.iter().enumerate() { let mut blocks = Vec::new(); @@ -63,8 +64,8 @@ impl PieceScheduler { let block = Block { begin: offset, length, - data: Vec::new(), requested: false, + completed: false, }; blocks.push(block); @@ -85,6 +86,7 @@ impl PieceScheduler { Self { pieces, any_complete: false, + file_manager: FileManager::new(output_dir, info_dict), } } @@ -105,7 +107,7 @@ impl PieceScheduler { .iter() .filter(|p| { !p.completed - && p.blocks.iter().any(|b| !b.requested && b.data.is_empty()) + && p.blocks.iter().any(|b| !b.requested && !b.completed) && p.peers.contains(peer_id) }) .min_by_key(|p| p.peers.len()) @@ -124,7 +126,8 @@ impl PieceScheduler { let block_bucket: usize = begin.div_ceil(BLOCK_SIZE).try_into().unwrap(); let block = &mut piece.blocks[block_bucket]; - block.data = data; + self.file_manager.save_block(index, begin, data); + block.completed = true; } pub fn add_peer_count(&mut self, peer_id: &Vec, bitfield: &Bitfield) { @@ -170,7 +173,7 @@ impl PieceScheduler { let block = piece .blocks .iter() - .find(|b| !b.requested && b.data.is_empty()) + .find(|b| !b.requested && !b.completed) .unwrap(); (piece.index as u32, block.begin, block.length) }); diff --git a/src/main.rs b/src/main.rs index 90af2f2..5f9d0f1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,9 @@ use rustorrent::{bencode::BencodeValue, client::Client, tracker::Tracker}; #[command(version, about, long_about = None)] struct Args { file_path: String, + + #[arg(short, long)] + output_dir: String, } fn read_file(filename: &str) -> Result, std::io::Error> { @@ -38,7 +41,7 @@ async fn main() { } let tracker = Tracker::new(bencode_value).expect("Failed to create tracker"); - let mut client = Client::new(tracker); + let mut client = Client::new(tracker, args.output_dir); match client.download().await { Ok(()) => println!("Download completed"),