Skip to content

Commit

Permalink
refactor: handle files on disk as PersistenceFile (#314)
Browse files Browse the repository at this point in the history
* refactor: `PersistenceFile`

* refactor: generalize `PersistenceFile`

* style: clppy suggestion
  • Loading branch information
Devdutt Shenoi authored Dec 8, 2023
1 parent 6ea7235 commit 6e65ba7
Showing 1 changed file with 119 additions and 93 deletions.
212 changes: 119 additions & 93 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use log::{self, debug, error, info, warn};
use seahash::hash;

use std::collections::VecDeque;
use std::fs::{self, File, OpenOptions};
use std::io::{self, Read, Write};
use std::fs::{self, OpenOptions};
use std::io::{self, copy, Write};
use std::mem;
use std::path::{Path, PathBuf};

Expand Down Expand Up @@ -94,22 +94,15 @@ impl Storage {

match &mut self.persistence {
Some(persistence) => {
let hash = hash(&self.current_write_file[..]);
let mut next_file = persistence.open_next_write_file()?;
info!(
"Flushing data to disk for stoarge: {}; path = {:?}",
self.name, next_file.path
);

next_file.file.write_all(&hash.to_be_bytes())?;
next_file.file.write_all(&self.current_write_file[..])?;
next_file.file.flush()?;
let NextFile { mut file, deleted } = persistence.open_next_write_file()?;
info!("Flushing data to disk for stoarge: {}; path = {:?}", self.name, file.path());
file.write(&mut self.current_write_file)?;

// 8 is the number of bytes the hash(u64) occupies
persistence.bytes_occupied += 8 + self.current_write_file.len();

self.current_write_file.clear();
Ok(next_file.deleted)

Ok(deleted)
}
None => {
// TODO(RT): Make sure that disk files starts with id 1 to represent in memory file
Expand Down Expand Up @@ -206,9 +199,91 @@ fn get_file_ids(path: &Path) -> Result<VecDeque<u64>, Error> {
Ok(file_ids)
}

struct NextFile {
path: PathBuf,
file: File,
/// A handle to describe a persistence file on disk
pub struct PersistenceFile<'a> {
/// Path to the persistence directory
dir: &'a Path,
/// Name of the file e.g. `backup@1`
file_name: String,
}

impl<'a> PersistenceFile<'a> {
pub fn new(dir: &'a Path, file_name: String) -> Result<Self, Error> {
Ok(Self { dir, file_name })
}

/// Path of persistence file when stored on disk
pub fn path(&self) -> PathBuf {
self.dir.join(&self.file_name)
}

// Moves the corrupt persistence file into special directory
fn handle_corrupt_file(&self) -> Result<(), Error> {
let path_src = self.path();
let dest_dir = self.dir.join("corrupted");
fs::create_dir_all(&dest_dir)?;
let path_dest = dest_dir.join(&self.file_name);

warn!("Moving corrupted file from {path_src:?} to {path_dest:?}");
fs::rename(path_src, path_dest)?;

Ok(())
}

/// Read contents of the persistence file from disk into buffer in memory
pub fn read(&mut self, buf: &mut BytesMut) -> Result<(), Error> {
let path = self.path();
let mut file = OpenOptions::new().read(true).open(path)?;

// Initialize buffer and load next read file
buf.clear();
copy(&mut file, &mut buf.writer())?;

// Verify with checksum
if buf.len() < 8 {
self.handle_corrupt_file()?;
return Err(Error::CorruptedFile);
}

let expected_hash = buf.get_u64();
let actual_hash = hash(&buf[..]);
if actual_hash != expected_hash {
self.handle_corrupt_file()?;
return Err(Error::CorruptedFile);
}

Ok(())
}

/// Write contents of buffer from memory onto the persistence file in disk
pub fn write(&mut self, buf: &mut BytesMut) -> Result<(), Error> {
let path = self.path();
let mut file = OpenOptions::new().write(true).create(true).open(path)?;

let hash = hash(&buf[..]);
file.write_all(&hash.to_be_bytes())?;
file.write_all(&buf[..])?;
file.flush()?;

Ok(())
}

/// Deletes the persistence file from disk
pub fn delete(&mut self) -> Result<u64, Error> {
let path = self.path();

// Query the fs to track size of removed persistence file
let metadata = fs::metadata(&path)?;
let bytes_occupied = metadata.len();

fs::remove_file(&path)?;

Ok(bytes_occupied)
}
}

struct NextFile<'a> {
file: PersistenceFile<'a>,
deleted: Option<u64>,
}

Expand Down Expand Up @@ -252,112 +327,63 @@ impl Persistence {
})
}

fn path(&self, id: u64) -> Result<PathBuf, Error> {
let file_name = format!("backup@{id}");
let path = self.path.join(file_name);

Ok(path)
}

/// Removes a file with provided id
/// Removes a persistence file with provided id
fn remove(&mut self, id: u64) -> Result<PathBuf, Error> {
let path = self.path(id)?;

// Query the fs to track size of removed persistence file
let metadata = fs::metadata(&path)?;
self.bytes_occupied -= metadata.len() as usize;
let file_name = format!("backup@{}", id);
let mut file = PersistenceFile::new(&self.path, file_name)?;
let path = file.path();

fs::remove_file(&path)?;
self.bytes_occupied -= file.delete()? as usize;

Ok(path)
}

/// Move corrupt file to special directory
fn handle_corrupt_file(&self) -> Result<(), Error> {
let id = self.current_read_file_id.expect("There is supposed to be a file here");
let path_src = self.path(id)?;
let dest_dir = self.path.join("corrupted");
fs::create_dir_all(&dest_dir)?;

let file_name = path_src.file_name().expect("The file name should exist");
let path_dest = dest_dir.join(file_name);

warn!("Moving corrupted file from {path_src:?} to {path_dest:?}");
fs::rename(path_src, path_dest)?;

Ok(())
}

/// Opens file to flush current inmemory write buffer to disk.
/// Also handles retention of previous files on disk
fn open_next_write_file(&mut self) -> Result<NextFile, Error> {
let next_file_id = self.backlog_files.back().map_or(0, |id| id + 1);
let file_name = format!("backup@{next_file_id}");
let next_file_path = self.path.join(file_name);
let next_file = OpenOptions::new().write(true).create(true).open(&next_file_path)?;

self.backlog_files.push_back(next_file_id);

let mut next = NextFile { path: next_file_path, file: next_file, deleted: None };
let mut backlog_files_count = self.backlog_files.len();

// File being read is also to be considered
if self.current_read_file_id.is_some() {
backlog_files_count += 1
}

// Return next file details if backlog is within limits
if backlog_files_count <= self.max_file_count {
return Ok(next);
}
// Delete earliest file if backlog limits crossed
let deleted = if backlog_files_count > self.max_file_count {
// Remove file being read, or first in backlog
// NOTE: keeps read buffer unchanged
let id = match self.current_read_file_id.take() {
Some(id) => id,
_ => self.backlog_files.pop_front().unwrap(),
};

if !self.non_destructive_read {
let deleted_file = self.remove(id)?;
warn!("file limit reached. deleting backup@{}; path = {deleted_file:?}", id);
}

// Remove file being read, or first in backlog
// NOTE: keeps read buffer unchanged
let id = match self.current_read_file_id.take() {
Some(id) => id,
_ => self.backlog_files.pop_front().unwrap(),
Some(id)
} else {
None
};

next.deleted = Some(id);
if !self.non_destructive_read {
let deleted_file = self.remove(id)?;
warn!("file limit reached. deleting backup@{}; path = {deleted_file:?}", id);
}

Ok(next)
let file_name = format!("backup@{}", next_file_id);
Ok(NextFile { file: PersistenceFile::new(&self.path, file_name)?, deleted })
}

/// Load the next persistence file to be read into memory
fn load_next_read_file(&mut self, current_read_file: &mut BytesMut) -> Result<(), Error> {
// Len always > 0 because of above if. Doesn't panic
let id = self.backlog_files.pop_front().unwrap();
let next_file_path = self.path(id)?;

let mut file = OpenOptions::new().read(true).open(&next_file_path)?;
let file_name = format!("backup@{}", id);
let mut file = PersistenceFile::new(&self.path, file_name)?;

// Load file into memory and store its id for deleting in the future
let metadata = fs::metadata(&next_file_path)?;

// Initialize next read file with 0s
current_read_file.clear();
let init = vec![0u8; metadata.len() as usize];
current_read_file.put_slice(&init);

file.read_exact(&mut current_read_file[..])?;
file.read(current_read_file)?;
self.current_read_file_id = Some(id);

// Verify with checksum
if current_read_file.len() < 8 {
self.handle_corrupt_file()?;
return Err(Error::CorruptedFile);
}

let expected_hash = current_read_file.get_u64();
let actual_hash = hash(&current_read_file[..]);
if actual_hash != expected_hash {
self.handle_corrupt_file()?;
return Err(Error::CorruptedFile);
}

Ok(())
}
}
Expand Down

0 comments on commit 6e65ba7

Please sign in to comment.