Skip to content

Commit

Permalink
save to file
Browse files Browse the repository at this point in the history
  • Loading branch information
menghaoyu2002 committed Jun 9, 2024
1 parent cc598b8 commit 88a8344
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target
/out
63 changes: 63 additions & 0 deletions src/client/file_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::{
fs::{create_dir_all, File, OpenOptions},
os::unix::fs::FileExt,
};

use crate::metainfo::Info;

#[derive(Debug)]
pub struct FileManager {
piece_length: u64,
files: Vec<(File, u64)>,
}

impl FileManager {
pub fn new(output_dir: String, info_dict: &Info) -> Self {
create_dir_all(&output_dir).unwrap();
match info_dict {
Info::SingleFile(info) => {
let file_path = format!("{}/{}", output_dir, info.name);
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(file_path)
.unwrap();
FileManager {
piece_length: info.base_info.piece_length as u64,
files: vec![(file, info.length)],
}
}
Info::MultiFile(info) => {
let mut files = Vec::new();
for file_info in &info.files {
let file_path = format!("{}/{}", output_dir, file_info.path.join("/"));
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(file_path)
.unwrap();
files.push((file, file_info.length));
}
FileManager {
piece_length: info.base_info.piece_length as u64,
files,
}
}
}
}

pub fn save_block(&mut self, piece_index: usize, begin: u32, data: Vec<u8>) {
let byte_offset = self.piece_length * piece_index as u64 + begin as u64;
let mut accumulated_size = 0;
for (file, file_size) in &mut self.files {
if byte_offset < accumulated_size + *file_size {
file.write_at(&data, byte_offset - accumulated_size)
.unwrap();
break;
}
accumulated_size += *file_size;
}
}
}
26 changes: 14 additions & 12 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::{
};

mod bitfield;
mod file_manager;
mod message;
mod pieces;

Expand Down Expand Up @@ -139,8 +140,8 @@ pub struct Client {
}

impl Client {
pub fn new(tracker: Tracker) -> Self {
let piece_scheduler = PieceScheduler::new(&tracker.get_metainfo().info);
pub fn new(tracker: Tracker, output_dir: String) -> Self {
let piece_scheduler = PieceScheduler::new(&tracker.get_metainfo().info, output_dir);
Self {
tracker,
peers: Arc::new(RwLock::new(HashMap::new())),
Expand All @@ -153,7 +154,7 @@ impl Client {
}

pub async fn download(&mut self) -> Result<(), ClientError> {
self.connect_to_peers(10).await?;
self.connect_to_peers(30).await?;

let _ = tokio::join!(
self.send_messages(),
Expand All @@ -176,8 +177,7 @@ impl Client {
let start_time = self.start_time;

tokio::spawn(async move {
loop {
// println!("Processing messages...");
while *total_downloaded.lock().await < total_length {
let Some((peer_id, message)) = receive_queue.lock().await.pop_front() else {
continue;
};
Expand Down Expand Up @@ -383,12 +383,13 @@ impl Client {
let peers = Arc::clone(&self.peers);
let receive_queue = Arc::clone(&self.receive_queue);
let piece_scheduler = Arc::clone(&self.piece_scheduler);
let total_length = self.tracker.get_metainfo().get_length();
let total_downloaded = Arc::clone(&self.total_downloaded);

tokio::spawn(async move {
let mut peers_to_remove = Vec::new();
loop {
// println!("Retrieving messages...");
while *total_downloaded.lock().await < total_length {
for (peer_id, peer) in peers.read().await.iter() {
// println!("pre-receive");
match receive_message(&peer.lock().await.stream).await {
Ok(message) => {
println!(
Expand All @@ -402,7 +403,7 @@ impl Client {
.push_back((peer_id.clone(), message));
}
Err(ReceiveError::WouldBlock) => {
// continue;
continue;
}
Err(e) => {
println!(
Expand All @@ -413,7 +414,6 @@ impl Client {
peers_to_remove.push(peer_id.clone());
}
}
// println!("post-receive");
peer.lock().await.last_touch = Utc::now();
yield_now().await;
}
Expand All @@ -435,9 +435,11 @@ impl Client {
let peers = Arc::clone(&self.peers);
let send_queue = Arc::clone(&self.send_queue);
let piece_scheduler = Arc::clone(&self.piece_scheduler);
let total_length = self.tracker.get_metainfo().get_length();
let total_downloaded = Arc::clone(&self.total_downloaded);

tokio::spawn(async move {
loop {
// println!("Sending messages...");
while *total_downloaded.lock().await < total_length {
let Some((peer_id, message)) = send_queue.lock().await.pop_front() else {
continue;
};
Expand Down
21 changes: 12 additions & 9 deletions src/client/pieces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ use std::collections::HashSet;

use crate::metainfo::Info;

use super::bitfield::Bitfield;
use super::{bitfield::Bitfield, file_manager::FileManager};

pub const BLOCK_SIZE: u32 = 2 << 13; // 16KB

#[derive(Debug)]
pub struct Block {
begin: u32,
length: u32,
data: Vec<u8>,
requested: bool,
completed: bool,
}

#[derive(Debug)]
Expand All @@ -26,12 +26,13 @@ pub struct Piece {
#[derive(Debug)]
pub struct PieceScheduler {
pieces: Vec<Piece>,
file_manager: FileManager,
any_complete: bool,
}

impl PieceScheduler {
pub fn new(info_dict: &Info) -> Self {
let (piece_hashes, piece_length, file_size) = match info_dict {
pub fn new(info_dict: &Info, output_dir: String) -> Self {
let (piece_hashes, piece_length, total_size) = match info_dict {
Info::SingleFile(info) => (
info.base_info.pieces.clone(),
info.base_info.piece_length,
Expand All @@ -49,7 +50,7 @@ impl PieceScheduler {
"piece length must be a multiple of the block size"
);

let mut remaining_size = file_size as u32;
let mut remaining_size = total_size as u32;
let mut pieces = Vec::new();
for (i, hash) in piece_hashes.iter().enumerate() {
let mut blocks = Vec::new();
Expand All @@ -63,8 +64,8 @@ impl PieceScheduler {
let block = Block {
begin: offset,
length,
data: Vec::new(),
requested: false,
completed: false,
};
blocks.push(block);

Expand All @@ -85,6 +86,7 @@ impl PieceScheduler {
Self {
pieces,
any_complete: false,
file_manager: FileManager::new(output_dir, info_dict),
}
}

Expand All @@ -105,7 +107,7 @@ impl PieceScheduler {
.iter()
.filter(|p| {
!p.completed
&& p.blocks.iter().any(|b| !b.requested && b.data.is_empty())
&& p.blocks.iter().any(|b| !b.requested && !b.completed)
&& p.peers.contains(peer_id)
})
.min_by_key(|p| p.peers.len())
Expand All @@ -124,7 +126,8 @@ impl PieceScheduler {

let block_bucket: usize = begin.div_ceil(BLOCK_SIZE).try_into().unwrap();
let block = &mut piece.blocks[block_bucket];
block.data = data;
self.file_manager.save_block(index, begin, data);
block.completed = true;
}

pub fn add_peer_count(&mut self, peer_id: &Vec<u8>, bitfield: &Bitfield) {
Expand Down Expand Up @@ -170,7 +173,7 @@ impl PieceScheduler {
let block = piece
.blocks
.iter()
.find(|b| !b.requested && b.data.is_empty())
.find(|b| !b.requested && !b.completed)
.unwrap();
(piece.index as u32, block.begin, block.length)
});
Expand Down
5 changes: 4 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use rustorrent::{bencode::BencodeValue, client::Client, tracker::Tracker};
#[command(version, about, long_about = None)]
struct Args {
file_path: String,

#[arg(short, long)]
output_dir: String,
}

fn read_file(filename: &str) -> Result<Vec<u8>, std::io::Error> {
Expand Down Expand Up @@ -38,7 +41,7 @@ async fn main() {
}

let tracker = Tracker::new(bencode_value).expect("Failed to create tracker");
let mut client = Client::new(tracker);
let mut client = Client::new(tracker, args.output_dir);

match client.download().await {
Ok(()) => println!("Download completed"),
Expand Down

0 comments on commit 88a8344

Please sign in to comment.