Skip to content

Commit

Permalink
fix: Allow a process to acquire a lease its already acquired (#2735)
Browse files Browse the repository at this point in the history
Related: https://github.com/GlareDB/cloud/issues/2746

Previously we weren't allowing it just errored with something along the
lines of:

```
Catalog error: Lease for database 'a9edc39a-6129-4b19-b8dc-02483df4fc6e' held by other process; other_process_id: d7796f4c-c3e4-4822-b59e-1b4b9f32a48d, current_process_id: d7796f4c-c3e4-4822-b59e-1b4b9f32a48d
```

I believe I originally intended to allow a leaser to acquire a lease its
already acquired based on the error message.

This doesn't fix the rate limit issue, but does partially alleviate
errors causes by failing to drop leases because dropping the lease
failed due to a rate limit.
  • Loading branch information
scsmithr authored and tychoish committed Mar 6, 2024
1 parent dad1f72 commit 0ebc336
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
43 changes: 37 additions & 6 deletions crates/metastore/src/storage/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl LeaseRenewer {
})?;

// Some other process has the lease.
if expires_at > now {
if expires_at > now && held_by != self.process_id {
return Err(StorageError::LeaseHeldByOtherProcess {
db_id: self.db_id,
other_process_id: held_by,
Expand Down Expand Up @@ -381,6 +381,8 @@ mod tests {
proto.try_into().unwrap()
}

// Note that the implementation for default uses non-zero uuids.
#[derive(Debug, Clone)]
struct LeaseRenewerTestParams {
now: SystemTime,
process_id: Uuid,
Expand Down Expand Up @@ -509,6 +511,33 @@ mod tests {
assert_eq!(None, lease.held_by);
}

#[tokio::test]
async fn lease_renewer_reacquire_own_lease() {
// See second error message in https://github.com/GlareDB/cloud/issues/2746
//
// If this process has already acquired the lease, attempting to acquire
// it again from the same process should work. This may happen if the
// metastore hits an error writing to object store (rate limit), and so
// fails the full request without actually being able to unlock the
// lease. Attempting to reacquire the lease in a second request should
// work though, since we still have the lease.
let params = LeaseRenewerTestParams::default();

let start = LeaseInformation {
state: LeaseState::Locked,
generation: 1,
expires_at: Some(params.now + LEASE_DURATION),
held_by: Some(params.process_id),
};
insert_lease(params.store.as_ref(), &params.visible_path, start).await;

let renewer = params.renewer();

// Make sure we can acquire it even though it's already been "acquired"
// (active lease inserted).
renewer.acquire_lease().await.unwrap();
}

#[tokio::test]
async fn remote_lease_idempotent_initialize() {
let store = Arc::new(object_store::memory::InMemory::new());
Expand Down Expand Up @@ -540,24 +569,26 @@ mod tests {
async fn remote_leaser_fail_acquire() {
let store = Arc::new(object_store::memory::InMemory::new());
let process_id = Uuid::new_v4();
let leaser = RemoteLeaser::new(process_id, store);
let leaser = RemoteLeaser::new(process_id, store.clone());

let db_id = Uuid::new_v4();
leaser.initialize(&db_id).await.unwrap();

let active = leaser.acquire(db_id).await.unwrap();

// Try to acquire a second lease.
let result = leaser.acquire(db_id).await;
// Try to acquire the lease using a different leaser.
let different = RemoteLeaser::new(Uuid::new_v4(), store);
let result = different.acquire(db_id).await;
assert!(
matches!(result, Err(StorageError::LeaseHeldByOtherProcess { .. })),
"result: {:?}",
result,
);

// Dropping previous lease should allow us to acquire a new one.
// Dropping previous lease should allow us to acquire a new one in the
// other process.
active.drop_lease().await.unwrap();

let _ = leaser.acquire(db_id).await.unwrap();
let _ = different.acquire(db_id).await.unwrap();
}
}
10 changes: 8 additions & 2 deletions crates/metastore/src/storage/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,14 @@ mod tests {
let db_id = Uuid::new_v4();
storage.initialize(db_id).await.unwrap();

// Sneakily get lease for catalog.
let lease = storage.leaser.acquire(db_id).await.unwrap();
// Sneakily get lease for catalog from a different process.
let process_id = Uuid::new_v4();
let different = Storage {
process_id,
store: storage.store.clone(), // Use the same store (in memory).
leaser: RemoteLeaser::new(process_id, storage.store.clone()),
};
let lease = different.leaser.acquire(db_id).await.unwrap();

// Reads should work.
let mut catalog = storage.read_catalog(db_id).await.unwrap();
Expand Down

0 comments on commit 0ebc336

Please sign in to comment.