Skip to content

Commit

Permalink
f Implement locking for FilesystemStore
Browse files Browse the repository at this point in the history
  • Loading branch information
tnull committed Apr 6, 2023
1 parent 3380625 commit af71cc6
Showing 1 changed file with 45 additions and 7 deletions.
52 changes: 45 additions & 7 deletions src/io/fs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ extern crate winapi;

use super::{KVStore, TransactionalWrite};

use std::collections::HashMap;
use std::fs;
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Mutex, RwLock};

#[cfg(not(target_os = "windows"))]
use std::os::unix::io::AsRawFd;
Expand Down Expand Up @@ -38,11 +40,13 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<winapi::shared::ntdef::W

pub struct FilesystemStore {
dest_dir: PathBuf,
locks: Mutex<HashMap<(String, String), Arc<RwLock<()>>>>,
}

impl FilesystemStore {
pub fn new(dest_dir: PathBuf) -> Self {
Self { dest_dir }
let locks = Mutex::new(HashMap::new());
Self { dest_dir, locks }
}
}

Expand All @@ -51,20 +55,34 @@ impl KVStore for FilesystemStore {
type Writer = FilesystemWriter;

fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
let mut outer_lock = self.locks.lock().unwrap();
let lock_key = (namespace.to_string(), key.to_string());
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());

let mut dest_file = self.dest_dir.clone();
dest_file.push(namespace);
dest_file.push(key);
FilesystemReader::new(dest_file)
FilesystemReader::new(dest_file, inner_lock_ref)
}

fn write(&self, namespace: &str, key: &str) -> std::io::Result<Self::Writer> {
let mut outer_lock = self.locks.lock().unwrap();
let lock_key = (namespace.to_string(), key.to_string());
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());

let mut dest_file = self.dest_dir.clone();
dest_file.push(namespace);
dest_file.push(key);
FilesystemWriter::new(dest_file)
FilesystemWriter::new(dest_file, inner_lock_ref)
}

fn remove(&self, namespace: &str, key: &str) -> std::io::Result<bool> {
let mut outer_lock = self.locks.lock().unwrap();
let lock_key = (namespace.to_string(), key.to_string());
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key.clone()).or_default());

let _guard = inner_lock_ref.write().unwrap();

let mut dest_file = self.dest_dir.clone();
dest_file.push(namespace);
dest_file.push(key);
Expand Down Expand Up @@ -96,6 +114,21 @@ impl KVStore for FilesystemStore {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "Removing key failed"));
}

if Arc::strong_count(&inner_lock_ref) == 2 {
// It's safe to remove the lock entry if we're the only one left holding a strong
// reference. Checking this is necessary to ensure we continue to distribute references to the
// same lock as long as some Writers/Readers are around. However, we still want to
// clean up the table when possible.
//
// Note that this by itself is still leaky as lock entries will remain when more Readers/Writers are
// around, but is preferable to doing nothing *or* something overly complex such as
// implementing yet another RAII structure just for this pupose.
outer_lock.remove(&lock_key);
}

// Garbage collect all lock entries that are not referenced anymore.
outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);

Ok(true)
}

Expand Down Expand Up @@ -134,18 +167,20 @@ impl KVStore for FilesystemStore {

pub struct FilesystemReader {
inner: BufReader<fs::File>,
lock_ref: Arc<RwLock<()>>,
}

impl FilesystemReader {
pub fn new(dest_file: PathBuf) -> std::io::Result<Self> {
fn new(dest_file: PathBuf, lock_ref: Arc<RwLock<()>>) -> std::io::Result<Self> {
let f = fs::File::open(dest_file.clone())?;
let inner = BufReader::new(f);
Ok(Self { inner })
Ok(Self { inner, lock_ref })
}
}

impl Read for FilesystemReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let _guard = self.lock_ref.read().unwrap();
self.inner.read(buf)
}
}
Expand All @@ -155,10 +190,11 @@ pub struct FilesystemWriter {
parent_directory: PathBuf,
tmp_file: PathBuf,
tmp_writer: BufWriter<fs::File>,
lock_ref: Arc<RwLock<()>>,
}

impl FilesystemWriter {
pub fn new(dest_file: PathBuf) -> std::io::Result<Self> {
fn new(dest_file: PathBuf, lock_ref: Arc<RwLock<()>>) -> std::io::Result<Self> {
let msg = format!("Could not retrieve parent directory of {}.", dest_file.display());
let parent_directory = dest_file
.parent()
Expand All @@ -179,7 +215,7 @@ impl FilesystemWriter {

let tmp_writer = BufWriter::new(fs::File::create(&tmp_file)?);

Ok(Self { dest_file, parent_directory, tmp_file, tmp_writer })
Ok(Self { dest_file, parent_directory, tmp_file, tmp_writer, lock_ref })
}
}

Expand All @@ -198,6 +234,8 @@ impl Write for FilesystemWriter {
impl TransactionalWrite for FilesystemWriter {
fn commit(&mut self) -> std::io::Result<()> {
self.flush()?;

let _guard = self.lock_ref.write().unwrap();
// Fsync the parent directory on Unix.
#[cfg(not(target_os = "windows"))]
{
Expand Down

0 comments on commit af71cc6

Please sign in to comment.