Skip to content

Commit

Permalink
Implement very simple advisory lock mechanism
Browse files Browse the repository at this point in the history
Blocking versions can't be implemented because they would require
spawning a new thread to avoid jamming up the background thread used for
other syscalls (the jamming up can generate a deadlock which is what
testing found). The blocking version would require spawning a new thread
with a standalone executor but that's horibbly expensive AND runs into
DataDog#448.
  • Loading branch information
vlovich committed May 11, 2024
1 parent 83d3023 commit 7989829
Show file tree
Hide file tree
Showing 7 changed files with 600 additions and 4 deletions.
353 changes: 350 additions & 3 deletions glommio/src/io/dma_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
os::unix::io::{AsRawFd, RawFd},
path::Path,
rc::Rc,
sync::{Arc, Weak as AWeak},
};

use super::{
Expand Down Expand Up @@ -51,6 +52,66 @@ pub(crate) fn align_down(v: u64, align: u64) -> u64 {
v & !(align - 1)
}

/// This guard represents that an advisory lock is being held over some file. See for more information about how
/// advisory locks work. This is obtained through [DmaFile::try_lock_exclusive] / [DmaFile::try_lock_shared]. If not
/// explicitly [unlocked](Self::unlock), the advisory lock is released synchronously on Drop.
#[derive(Debug, Clone)]
pub struct AdvisoryLockGuard(Option<Arc<OwnedGlommioFile>>);

impl Drop for AdvisoryLockGuard {
fn drop(&mut self) {
if let Some(mut locked) = self.0.take().and_then(Arc::into_inner) {
unsafe { locked.funlock_immediately() }
}
}
}

impl AdvisoryLockGuard {
/// Explicitly releases the lock. The file can be successfully closed after calling this.
pub async fn unlock(mut self) -> Result<()> {
let inner = self.0.take().unwrap();

match Arc::try_unwrap(inner).map(GlommioFile::from) {
Ok(lock) => unsafe { lock.funlock() }.await,
Err(still_locked) => Err(crate::GlommioError::CanNotBeClosed(
crate::ResourceType::File(
still_locked
.path
.as_ref()
.map_or("path has already been taken!".to_string(), |p| {
p.to_string_lossy().to_string()
}),
),
"Another clone of this file exists somewhere - cannot close fd",
)),
}
}

/// Downgrade the lock guard to a weak reference that won't keep the file open. The lock will be released when the
/// .
pub fn downgrade(&self) -> WeakAdvisoryLockGuard {
WeakAdvisoryLockGuard(Arc::downgrade(self.0.as_ref().unwrap()))
}
}

/// This holds a weak reference to an advisory lock. The advisory lock may be dropped while this reference is held
/// in which case this instance will fail to [upgrade](Self::upgrade).
#[derive(Debug)]
pub struct WeakAdvisoryLockGuard(AWeak<OwnedGlommioFile>);

impl WeakAdvisoryLockGuard {
/// Returns a weak lock guard not pointing at anything.
pub fn new(&self) -> Self {
Self(AWeak::new())
}

/// Upgrades a weak handle to a held advisory lock back to a strong handle. Returns None if the advisory lock was
/// dropped.
pub fn upgrade(&self) -> Option<AdvisoryLockGuard> {
self.0.upgrade().map(|guard| AdvisoryLockGuard(Some(guard)))
}
}

#[derive(Debug, Clone)]
/// An asynchronously accessed Direct Memory Access (DMA) file.
///
Expand Down Expand Up @@ -604,6 +665,96 @@ impl DmaFile {
self.file.statx().await.map(Into::into)
}

/// Tries to acquire a process-wide advisory shared lock on this file instance. If an exclusive advisory lock is
/// already held for the underlying file (by any process), this immediately returns with an error. Similarly,
/// if a shared lock is already held on this file instance, this returns an error. To acquire multiple shared locks
/// in-process on the same file, you must acquire them through files obtained through separate calls to open the
/// file.
///
/// This is equivalent to [flock](https://linux.die.net/man/2/flock) with `LOCK_SH | LOCK_NB` and requires the
/// file have [read](OpenOptions::read) allowed on it.
///
/// # File instances and advisory locks.
///
/// For the purpose of advisory locks, clones and [dupes](Self::dup) count as being the same file instance (even
/// though dupes don't keep the file open). Advisory locks keep the underlying file that they were acquired on open
/// while they are held.
///
/// # Safety
///
/// The simple mechanism here only allows one advisory lock holder for the process for a given file instance
/// and that lock must either be [unlocked](AdvisoryLockGuard::unlock).
///
/// The reason only a single advisory lock can be held on a given file instance at a time is for safety and
/// soundness - the kernel advisory lock is a status bit and any unlock request will unlock the file. That causes
/// some danger if you acquire two shared locks and then unlock one of them - the file is unlocked from then on. Or
/// worse, you unlock one of them, acquire an exclusive lock and then release what you thought was the shared lock
/// but end up releasing the exclusive lock. Similarly, acquiring a shared lock will downgrade if you're holding
/// an exclusive lock.
///
/// # Usage
///
/// The purpose of advisory locks is to synchronize access to a resource. A common use-case is that if you're
/// reading to a file, you acquire a shared lock so that any other processes can also continue reading it but you're
/// guaranteed that no one trying to write holding an exclusive lock is admitted.
///
/// ```
/// /// ```no_run
/// use glommio::{
/// LocalExecutor,
/// io::{
/// OpenOptions,
/// DmaBuffer,
/// }
/// };
///
/// let ex = LocalExecutor::default();
/// ex.run(async {
/// // A new anonymous file is created within `some_directory/`.
/// let file = OpenOptions::new()
/// .create_new(true)
/// .read(true)
/// .write(true)
/// .tmpfile(true)
/// .dma_open("some_directory")
/// .await
/// .unwrap();
///
/// let guard = file.try_lock_shared().await.unwrap();
///
/// // Until the guard is dropped, no other process on the file system can acquire an advisory read lock
/// // on the file.
///
/// // File won't close until you either unlock the guard, drop it, or give it back as part of closing.
/// guard.unlock().await;
/// });
/// ```
///
/// NOTE:
/// [Currently](https://github.com/axboe/liburing/issues/85) flock isn't implemented through io_uring so it requires
/// dispatching to a background thread.
pub async fn try_lock_shared(&self) -> Result<AdvisoryLockGuard> {
self.file
.try_lock_shared()
.await
.map(|f| AdvisoryLockGuard(Some(Arc::new(f))))
}

/// Tries to acquire an advisory exclusive lock on this file instance. If successful, then the OS advisory lock
/// is acquired. If failed, it's either because the OS advisory lock is held as shared OR because this file instance
/// already handed out a lock (or is in the process of handing one out) - either exclusive or shared.
///
/// This is equivalent to [flock](https://linux.die.net/man/2/flock) with `LOCK_EX | LOCK_NB` and requires the
/// file have [write](OpenOptions::write) allowed on it.
///
/// See [try_lock_shared](Self::try_lock_shared) for more details.
pub async fn try_lock_exclusive(&self) -> Result<AdvisoryLockGuard> {
self.file
.try_lock_exclusive()
.await
.map(|f| AdvisoryLockGuard(Some(Arc::new(f))))
}

/// Attempt to confirm no other clones of this file exist. If no clones exist,
/// Ok(self) is returned. If clones remain, Err(self) is returned. Do not use
/// this unless you are implementing a polling mechanism to determine when it's
Expand Down Expand Up @@ -950,7 +1101,7 @@ impl WeakDmaFile {
}

/// The major ID of the device containing the filesystem where the file resides.
/// The device may be found by issuing a `readlink`` on `/sys/dev/block/<major>:<minor>`
/// The device may be found by issuing a `readlink` on `/sys/dev/block/<major>:<minor>`
pub fn dev_major(&self) -> u32 {
self.file.dev_major
}
Expand All @@ -972,8 +1123,11 @@ impl WeakDmaFile {
pub(crate) mod test {
use super::*;
use crate::{
enclose, test_utils::make_test_directories, ByteSliceMutExt, GlommioError, Latency,
LocalExecutor, ResourceType, Shares,
enclose,
sync::Semaphore,
test_utils::make_test_directories,
timer::{sleep, timeout},
ByteSliceMutExt, GlommioError, Latency, LocalExecutor, ResourceType, Shares,
};
use futures::join;
use futures_lite::{stream, StreamExt};
Expand Down Expand Up @@ -2208,4 +2362,197 @@ pub(crate) mod test {

assert_eq!(weak.strong_count(), 0);
});

dma_file_test!(advisory_lock_exclusive_wait_for_exclusive, path, _k, {
let file = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.dma_open(path.join("testfile"))
.await
.unwrap();

let same_path_different_file = OpenOptions::new()
.read(true)
.write(true)
.dma_open(path.join("testfile"))
.await
.unwrap();

let duped = file.dup().unwrap();

let guard = file.try_lock_exclusive().await.unwrap();
let waiting_on_exclusive_lock_attempted = std::rc::Rc::new(Semaphore::new(0));

let notify_exclusive_lock_attempted = waiting_on_exclusive_lock_attempted.clone();
let acquire_exclusive = crate::spawn_local(timeout(Duration::from_millis(500), async move {
assert!(same_path_different_file.try_lock_exclusive().await.is_err(), "Shouldn't be able to acquire an exclusive lock on the same path while holding it through a different file");
notify_exclusive_lock_attempted.signal(1);
eprintln!("Starting to wait for exclusive lock");
loop {
if let Ok(locked) = same_path_different_file.try_lock_exclusive().await {
return Ok(locked)
}
sleep(Duration::from_millis(5)).await;
}
})).detach();

duped.try_lock_shared().await.expect_err(
"Shouldn't be able to acquire a lock on a dupe while holding it on the parent",
);
duped.try_lock_exclusive().await.expect_err(
"Shouldn't be able to acquire a lock on a dupe while holding it on the parent",
);

let file = file.try_take_last_clone().expect_err(
"Shouldn't be able to acquire a lock on a dupe while holding it on the parent",
);
let weak = file.downgrade();
std::mem::drop(file);
assert!(
weak.upgrade().is_some(),
"Guard should be keeping the file open"
);

eprintln!("Waiting for exclusive lock to have been attempted");
waiting_on_exclusive_lock_attempted
.acquire(1)
.await
.unwrap();
eprintln!("Dropping guard");
guard.unlock().await.unwrap();

assert!(
weak.upgrade().is_none(),
"Guard unlocking should have implicitly caused the file to close"
);

timeout(Duration::from_millis(15), async move {
acquire_exclusive.await.expect("Task not cancelled")
})
.await
.expect("Exclusive lock should have been automatically acquired");
});

dma_file_test!(advisory_lock_shared_wait_for_exclusive, path, _k, {
let file = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.dma_open(path.join("testfile"))
.await
.unwrap();

let same_path_different_file = OpenOptions::new()
.read(true)
.write(true)
.dma_open(path.join("testfile"))
.await
.unwrap();

let duped = file.dup().unwrap();

let guard = file.try_lock_exclusive().await.unwrap();
let waiting_on_shared_lock = std::rc::Rc::new(Semaphore::new(0));

let notify_waiting_on_shared_lock = waiting_on_shared_lock.clone();
let acquire_exclusive = crate::spawn_local(timeout(Duration::from_millis(500), async move {
assert!(same_path_different_file.try_lock_shared().await.is_err(), "Shouldn't be able to acquire a shared lock on the same path while holding an exclusive lock through a different file");
notify_waiting_on_shared_lock.signal(1);
loop {
if let Ok(locked) = same_path_different_file.try_lock_shared().await {
return Ok(locked);
}
sleep(Duration::from_millis(5)).await;
}
})).detach();

duped.try_lock_shared().await.expect_err(
"Shouldn't be able to acquire a lock on a dupe while holding it on the parent",
);
duped.try_lock_exclusive().await.expect_err(
"Shouldn't be able to acquire a lock on a dupe while holding it on the parent",
);

guard.unlock().await.unwrap();

timeout(Duration::from_millis(15), async move {
acquire_exclusive.await.expect("Task not cancelled")
})
.await
.expect("Shared lock should have been automatically acquired");
});

dma_file_test!(advisory_lock_shared, path, _k, {
let file = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.dma_open(path.join("testfile"))
.await
.unwrap();

let same_path_different_file = OpenOptions::new()
.read(true)
.write(true)
.dma_open(path.join("testfile"))
.await
.unwrap();

let duped = file.dup().unwrap();

let guard = file.try_lock_shared().await.unwrap();

duped.try_lock_shared().await.expect_err(
"Shouldn't be able to acquire a lock on a dupe while holding it on the parent",
);
duped.try_lock_exclusive().await.expect_err(
"Shouldn't be able to acquire a lock on a dupe while holding it on the parent",
);

let guard2 = same_path_different_file
.try_lock_shared()
.await
.expect("Should be able to acquire a shared lock through a different open instance");

guard.unlock().await.unwrap();

duped.try_lock_exclusive().await.expect_err(
"Shouldn't be able to acquire an exclusive lock while holding a shared lock",
);

guard2.unlock().await.unwrap();

duped
.try_lock_exclusive()
.await
.expect("Locked exclusively once the shared lock was released");
});

dma_file_test!(advisory_weak_lock, path, _k, {
let file = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.dma_open(path.join("testfile"))
.await
.unwrap();

let exclusive = file.try_lock_exclusive().await.unwrap();
let exclusive_clone = exclusive.clone();
let weak_exclusive = exclusive.downgrade();

assert!(weak_exclusive.upgrade().is_some());

exclusive
.unlock()
.await
.expect_err("Shouldn't be able to unlock while multiple lock references");

assert!(weak_exclusive.upgrade().is_some());

exclusive_clone.unlock().await.unwrap();

assert!(weak_exclusive.upgrade().is_none());
});
}
Loading

0 comments on commit 7989829

Please sign in to comment.