Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: handle files on disk as PersistenceFile #314

Merged
merged 3 commits into from
Dec 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 119 additions & 93 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use log::{self, debug, error, info, warn};
use seahash::hash;

use std::collections::VecDeque;
use std::fs::{self, File, OpenOptions};
use std::io::{self, Read, Write};
use std::fs::{self, OpenOptions};
use std::io::{self, copy, Write};
use std::mem;
use std::path::{Path, PathBuf};

Expand Down Expand Up @@ -94,22 +94,15 @@ impl Storage {

match &mut self.persistence {
Some(persistence) => {
let hash = hash(&self.current_write_file[..]);
let mut next_file = persistence.open_next_write_file()?;
info!(
"Flushing data to disk for stoarge: {}; path = {:?}",
self.name, next_file.path
);

next_file.file.write_all(&hash.to_be_bytes())?;
next_file.file.write_all(&self.current_write_file[..])?;
next_file.file.flush()?;
let NextFile { mut file, deleted } = persistence.open_next_write_file()?;
info!("Flushing data to disk for stoarge: {}; path = {:?}", self.name, file.path());
file.write(&mut self.current_write_file)?;

// 8 is the number of bytes the hash(u64) occupies
persistence.bytes_occupied += 8 + self.current_write_file.len();

self.current_write_file.clear();
Ok(next_file.deleted)

Ok(deleted)
}
None => {
// TODO(RT): Make sure that disk files starts with id 1 to represent in memory file
Expand Down Expand Up @@ -206,9 +199,91 @@ fn get_file_ids(path: &Path) -> Result<VecDeque<u64>, Error> {
Ok(file_ids)
}

struct NextFile {
path: PathBuf,
file: File,
/// A handle to describe a persistence file on disk
pub struct PersistenceFile<'a> {
/// Path to the persistence directory
dir: &'a Path,
/// Name of the file e.g. `backup@1`
file_name: String,
}

impl<'a> PersistenceFile<'a> {
pub fn new(dir: &'a Path, file_name: String) -> Result<Self, Error> {
Ok(Self { dir, file_name })
}

/// Path of persistence file when stored on disk
pub fn path(&self) -> PathBuf {
self.dir.join(&self.file_name)
}

// Moves the corrupt persistence file into special directory
fn handle_corrupt_file(&self) -> Result<(), Error> {
let path_src = self.path();
let dest_dir = self.dir.join("corrupted");
fs::create_dir_all(&dest_dir)?;
let path_dest = dest_dir.join(&self.file_name);

warn!("Moving corrupted file from {path_src:?} to {path_dest:?}");
fs::rename(path_src, path_dest)?;

Ok(())
}

/// Read contents of the persistence file from disk into buffer in memory
pub fn read(&mut self, buf: &mut BytesMut) -> Result<(), Error> {
let path = self.path();
let mut file = OpenOptions::new().read(true).open(path)?;

// Initialize buffer and load next read file
buf.clear();
copy(&mut file, &mut buf.writer())?;

// Verify with checksum
if buf.len() < 8 {
self.handle_corrupt_file()?;
return Err(Error::CorruptedFile);
}

let expected_hash = buf.get_u64();
let actual_hash = hash(&buf[..]);
if actual_hash != expected_hash {
self.handle_corrupt_file()?;
return Err(Error::CorruptedFile);
}

Ok(())
}

/// Write contents of buffer from memory onto the persistence file in disk
pub fn write(&mut self, buf: &mut BytesMut) -> Result<(), Error> {
let path = self.path();
let mut file = OpenOptions::new().write(true).create(true).open(path)?;

let hash = hash(&buf[..]);
file.write_all(&hash.to_be_bytes())?;
file.write_all(&buf[..])?;
file.flush()?;

Ok(())
}

/// Deletes the persistence file from disk
pub fn delete(&mut self) -> Result<u64, Error> {
let path = self.path();

// Query the fs to track size of removed persistence file
let metadata = fs::metadata(&path)?;
let bytes_occupied = metadata.len();

fs::remove_file(&path)?;

Ok(bytes_occupied)
}
}

struct NextFile<'a> {
file: PersistenceFile<'a>,
deleted: Option<u64>,
}

Expand Down Expand Up @@ -252,112 +327,63 @@ impl Persistence {
})
}

fn path(&self, id: u64) -> Result<PathBuf, Error> {
let file_name = format!("backup@{id}");
let path = self.path.join(file_name);

Ok(path)
}

/// Removes a file with provided id
/// Removes a persistence file with provided id
fn remove(&mut self, id: u64) -> Result<PathBuf, Error> {
let path = self.path(id)?;

// Query the fs to track size of removed persistence file
let metadata = fs::metadata(&path)?;
self.bytes_occupied -= metadata.len() as usize;
let file_name = format!("backup@{}", id);
let mut file = PersistenceFile::new(&self.path, file_name)?;
let path = file.path();

fs::remove_file(&path)?;
self.bytes_occupied -= file.delete()? as usize;

Ok(path)
}

/// Move corrupt file to special directory
fn handle_corrupt_file(&self) -> Result<(), Error> {
let id = self.current_read_file_id.expect("There is supposed to be a file here");
let path_src = self.path(id)?;
let dest_dir = self.path.join("corrupted");
fs::create_dir_all(&dest_dir)?;

let file_name = path_src.file_name().expect("The file name should exist");
let path_dest = dest_dir.join(file_name);

warn!("Moving corrupted file from {path_src:?} to {path_dest:?}");
fs::rename(path_src, path_dest)?;

Ok(())
}

/// Opens file to flush current inmemory write buffer to disk.
/// Also handles retention of previous files on disk
fn open_next_write_file(&mut self) -> Result<NextFile, Error> {
let next_file_id = self.backlog_files.back().map_or(0, |id| id + 1);
let file_name = format!("backup@{next_file_id}");
let next_file_path = self.path.join(file_name);
let next_file = OpenOptions::new().write(true).create(true).open(&next_file_path)?;

self.backlog_files.push_back(next_file_id);

let mut next = NextFile { path: next_file_path, file: next_file, deleted: None };
let mut backlog_files_count = self.backlog_files.len();

// File being read is also to be considered
if self.current_read_file_id.is_some() {
backlog_files_count += 1
}

// Return next file details if backlog is within limits
if backlog_files_count <= self.max_file_count {
return Ok(next);
}
// Delete earliest file if backlog limits crossed
let deleted = if backlog_files_count > self.max_file_count {
// Remove file being read, or first in backlog
// NOTE: keeps read buffer unchanged
let id = match self.current_read_file_id.take() {
Some(id) => id,
_ => self.backlog_files.pop_front().unwrap(),
};

if !self.non_destructive_read {
let deleted_file = self.remove(id)?;
warn!("file limit reached. deleting backup@{}; path = {deleted_file:?}", id);
}

// Remove file being read, or first in backlog
// NOTE: keeps read buffer unchanged
let id = match self.current_read_file_id.take() {
Some(id) => id,
_ => self.backlog_files.pop_front().unwrap(),
Some(id)
} else {
None
};

next.deleted = Some(id);
if !self.non_destructive_read {
let deleted_file = self.remove(id)?;
warn!("file limit reached. deleting backup@{}; path = {deleted_file:?}", id);
}

Ok(next)
let file_name = format!("backup@{}", next_file_id);
Ok(NextFile { file: PersistenceFile::new(&self.path, file_name)?, deleted })
}

/// Load the next persistence file to be read into memory
fn load_next_read_file(&mut self, current_read_file: &mut BytesMut) -> Result<(), Error> {
// Len always > 0 because of above if. Doesn't panic
let id = self.backlog_files.pop_front().unwrap();
let next_file_path = self.path(id)?;

let mut file = OpenOptions::new().read(true).open(&next_file_path)?;
let file_name = format!("backup@{}", id);
let mut file = PersistenceFile::new(&self.path, file_name)?;

// Load file into memory and store its id for deleting in the future
let metadata = fs::metadata(&next_file_path)?;

// Initialize next read file with 0s
current_read_file.clear();
let init = vec![0u8; metadata.len() as usize];
current_read_file.put_slice(&init);

file.read_exact(&mut current_read_file[..])?;
file.read(current_read_file)?;
self.current_read_file_id = Some(id);

// Verify with checksum
if current_read_file.len() < 8 {
self.handle_corrupt_file()?;
return Err(Error::CorruptedFile);
}

let expected_hash = current_read_file.get_u64();
let actual_hash = hash(&current_read_file[..]);
if actual_hash != expected_hash {
self.handle_corrupt_file()?;
return Err(Error::CorruptedFile);
}

Ok(())
}
}
Expand Down
Loading