diff --git a/crates/matrix-sdk-crypto/src/store/integration_tests.rs b/crates/matrix-sdk-crypto/src/store/integration_tests.rs index 4db7de65b38..58d1b6fd35d 100644 --- a/crates/matrix-sdk-crypto/src/store/integration_tests.rs +++ b/crates/matrix-sdk-crypto/src/store/integration_tests.rs @@ -59,7 +59,7 @@ macro_rules! cryptostore_integration_tests { device_id!("BOBDEVICE") } - async fn get_loaded_store(name: &str) -> (ReadOnlyAccount, impl CryptoStore) { + pub async fn get_loaded_store(name: &str) -> (ReadOnlyAccount, impl CryptoStore) { let store = get_store(name, None).await; let account = get_account(); store.save_account(account.clone()).await.expect("Can't save account"); @@ -855,3 +855,76 @@ macro_rules! cryptostore_integration_tests { } }; } + +#[allow(unused_macros)] +#[macro_export] +macro_rules! cryptostore_integration_tests_time { + () => { + mod cryptostore_integration_tests_time { + use std::time::Duration; + + use matrix_sdk_test::async_test; + use $crate::store::CryptoStore as _; + + use super::cryptostore_integration_tests::*; + + #[async_test] + async fn test_lease_locks() { + let (_account, store) = get_loaded_store("lease_locks").await; + + let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); + assert!(acquired0); + + // Should extend the lease automatically (same holder). + let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); + assert!(acquired2); + + // Should extend the lease automatically (same holder + time is ok). + let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); + assert!(acquired3); + + // Another attempt at taking the lock should fail, because it's taken. + let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); + assert!(!acquired4); + + // Even if we insist. + let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); + assert!(!acquired5); + + // That's a nice test we got here, go take a little nap. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Still too early. + let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); + assert!(!acquired55); + + // Ok you can take another nap then. + tokio::time::sleep(Duration::from_millis(250)).await; + + // At some point, we do get the lock. + let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap(); + assert!(acquired6); + + tokio::time::sleep(Duration::from_millis(1)).await; + + // The other gets it almost immediately too. + let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); + assert!(acquired7); + + tokio::time::sleep(Duration::from_millis(1)).await; + + // But when we take a longer lease... + let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); + assert!(acquired8); + + // It blocks the other user. + let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); + assert!(!acquired9); + + // We can hold onto our lease. + let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); + assert!(acquired10); + } + } + }; +} diff --git a/crates/matrix-sdk-crypto/src/store/locks.rs b/crates/matrix-sdk-crypto/src/store/locks.rs index d692068d6c5..d6171fe4618 100644 --- a/crates/matrix-sdk-crypto/src/store/locks.rs +++ b/crates/matrix-sdk-crypto/src/store/locks.rs @@ -14,21 +14,37 @@ //! Collection of small helpers that implement store-based locks. //! -//! Those locks are implemented as one value in the key-value crypto store, that -//! exists if and only if the lock has been taken. For this to work correctly, -//! we rely on multiple assumptions: +//! This is a per-process lock that may be used only for very specific use +//! cases, where multiple processes might concurrently write to the same +//! database at the same time; this would invalidate crypto store caches, so +//! that should be done mindfully. Such a lock can be acquired multiple times by +//! the same process, and it remains active as long as there's at least one user +//! in a given process. //! -//! - the store must allow concurrent reads and writes from multiple processes. -//! For instance, for -//! sqlite, this means that it is running in [WAL](https://www.sqlite.org/wal.html) mode. -//! - the two operations used in the store implementation, -//! `insert_custom_value_if_missing` and -//! `remove_custom_value`, must be atomic / implemented in a transaction. +//! The lock is implemented using time-based leases to values inserted in a +//! crypto store. The store maintains the lock identifier (key), who's the +//! current holder (value), and an expiration timestamp on the side; see also +//! `CryptoStore::try_take_leased_lock` for more details. +//! +//! The lock is initially acquired for a certain period of time (namely, the +//! duration of a lease, aka `LEASE_DURATION_MS`), and then a "heartbeat" task +//! renews the lease to extend its duration, every so often (namely, every +//! `EXTEND_LEASE_EVERY_MS`). Since the tokio scheduler might be busy, the +//! extension request should happen way more frequently than the duration of a +//! lease, in case a deadline is missed. The current values have been chosen to +//! reflect that, with a ratio of 1:10 as of 2023-06-23. +//! +//! Releasing the lock happens naturally, by not renewing a lease. It happens +//! automatically after the duration of the last lease, at most. -use std::{sync::Arc, time::Duration}; +use std::{ + sync::{atomic::AtomicU32, Arc}, + time::Duration, +}; +use matrix_sdk_common::executor::JoinHandle; use tokio::{sync::Mutex, time::sleep}; -use tracing::{instrument, trace, warn}; +use tracing::instrument; use super::DynCryptoStore; use crate::CryptoStoreError; @@ -42,6 +58,21 @@ enum WaitingTime { Stop, } +/// A guard on the crypto store lock. +/// +/// The lock will be automatically released a short period of time after all the +/// guards have dropped. +#[derive(Debug)] +pub struct CryptoStoreLockGuard { + num_holders: Arc, +} + +impl Drop for CryptoStoreLockGuard { + fn drop(&mut self) { + self.num_holders.fetch_sub(1, atomic::Ordering::SeqCst); + } +} + /// A store-based lock for the `CryptoStore`. #[derive(Clone, Debug)] pub struct CryptoStoreLock { @@ -55,7 +86,14 @@ pub struct CryptoStoreLock { /// /// When the number of holders is decreased to 0, then the lock must be /// released in the store. - num_holders: Arc>, + num_holders: Arc, + + /// A mutex to control an attempt to take the lock, to avoid making it + /// reentrant. + locking_attempt: Arc>, + + /// Current renew task spawned by `try_lock_once`. + renew_task: Arc>>>, /// The key used in the key/value mapping for the lock entry. lock_key: String, @@ -68,6 +106,16 @@ pub struct CryptoStoreLock { } impl CryptoStoreLock { + /// Amount of time a lease of the lock should last, in milliseconds. + pub const LEASE_DURATION_MS: u32 = 500; + + /// Period of time between two attempts to extend the lease. We'll + /// re-request a lease for an entire duration of `LEASE_DURATION_MS` + /// milliseconds, every `EXTEND_LEASE_EVERY_MS`, so this has to + /// be an amount safely low compared to `LEASE_DURATION_MS`, to make sure + /// that we can miss a deadline without compromising the lock. + const EXTEND_LEASE_EVERY_MS: u64 = 50; + /// Initial backoff, in milliseconds. This is the time we wait the first /// time, if taking the lock initially failed. const INITIAL_BACKOFF_MS: u32 = 10; @@ -89,53 +137,116 @@ impl CryptoStoreLock { lock_key, lock_holder, backoff: Arc::new(Mutex::new(WaitingTime::Some(Self::INITIAL_BACKOFF_MS))), - num_holders: Arc::new(Mutex::new(0)), + num_holders: Arc::new(0.into()), + locking_attempt: Arc::new(Mutex::new(())), + renew_task: Default::default(), } } /// Try to lock once, returns whether the lock was obtained or not. #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))] - pub async fn try_lock_once(&self) -> Result { - // Hold the num_holders lock for the entire's function lifetime, to avoid - // internal races if called in a reentrant manner. - let mut holders = self.num_holders.lock().await; + pub async fn try_lock_once(&self) -> Result, CryptoStoreError> { + // Hold onto the locking attempt mutex for the entire lifetime of this + // function, to avoid multiple reentrant calls. + let mut _attempt = self.locking_attempt.lock().await; // If another thread obtained the lock, make sure to only superficially increase // the number of holders, and carry on. - if *holders > 0 { - trace!("We already had the lock, incrementing holder count"); - *holders += 1; - return Ok(true); + if self.num_holders.load(atomic::Ordering::SeqCst) > 0 { + // Note: between the above load and the fetch_add below, another thread may + // decrement `num_holders`. That's fine because that means the lock + // was taken by at least one thread, and after this call it will be + // taken by at least one thread. + tracing::trace!("We already had the lock, incrementing holder count"); + self.num_holders.fetch_add(1, atomic::Ordering::SeqCst); + let guard = CryptoStoreLockGuard { num_holders: self.num_holders.clone() }; + return Ok(Some(guard)); } - let inserted = self + let acquired = self .store - .insert_custom_value_if_missing(&self.lock_key, self.lock_holder.as_bytes().to_vec()) + .try_take_leased_lock(Self::LEASE_DURATION_MS, &self.lock_key, &self.lock_holder) .await?; - if inserted { - trace!("Successfully acquired lock through db write"); - *holders += 1; - return Ok(true); + if !acquired { + tracing::trace!("Couldn't acquire the lock immediately."); + return Ok(None); } - // Double-check that we were not interrupted last time we tried to take the - // lock, and forgot to release it; in that case, we *still* hold it. - let previous = self.store.get_custom_value(&self.lock_key).await?; - if previous.as_deref() == Some(self.lock_holder.as_bytes()) { - warn!( - "Crypto-store lock {} was already taken by {}; let's pretend we just acquired it.", - self.lock_key, self.lock_holder - ); - *holders += 1; - return Ok(true); - } + tracing::trace!("Acquired the lock, spawning the lease extension task."); + + // This is the first time we've acquired the lock. We're going to spawn the task + // that will renew the lease. - if let Some(prev_holder) = previous { - trace!("Lock is already taken by {}", String::from_utf8_lossy(&prev_holder)); + // Clone data to be owned by the task. + let this = self.clone(); + + let mut renew_task = self.renew_task.lock().await; + + // Cancel the previous task, if any. That's safe to do, because: + // - either the task was done, + // - or it was still running, but taking a lock in the db has to be an atomic + // operation + // running in a transaction. + + if let Some(_prev) = renew_task.take() { + #[cfg(not(target_arch = "wasm32"))] + _prev.abort(); } - Ok(false) + // Restart a new one. + *renew_task = Some(matrix_sdk_common::executor::spawn(async move { + loop { + { + // First, check if there are still users of this lock. + // + // This is not racy, because: + // - the `locking_attempt` mutex makes sure we don't have unexpected + // interactions with the non-atomic sequence above in `try_lock_once` + // (check > 0, then add 1). + // - other entities holding onto the `num_holders` atomic will only + // decrease it over time. + + let _guard = this.locking_attempt.lock().await; + + // If there are no more users, we can quit. + if this.num_holders.load(atomic::Ordering::SeqCst) == 0 { + tracing::info!("exiting the lease extension loop"); + + // Cancel the lease with another 0ms lease. + // If we don't get the lock, that's (weird but) fine. + let _ = this + .store + .try_take_leased_lock(0, &this.lock_key, &this.lock_holder) + .await; + + // Exit the loop. + break; + } + } + + sleep(Duration::from_millis(Self::EXTEND_LEASE_EVERY_MS)).await; + + if let Err(err) = this + .store + .try_take_leased_lock( + Self::LEASE_DURATION_MS, + &this.lock_key, + &this.lock_holder, + ) + .await + { + tracing::error!("error when extending lock lease: {err:#}"); + // Exit the loop. + break; + } + } + })); + + self.num_holders.fetch_add(1, atomic::Ordering::SeqCst); + + let guard = CryptoStoreLockGuard { num_holders: self.num_holders.clone() }; + Ok(Some(guard)) } /// Attempt to take the lock, with exponential backoff if the lock has @@ -147,17 +258,20 @@ impl CryptoStoreLock { /// and will return a timeout error upon locking. If not provided, /// will wait for [`Self::MAX_BACKOFF_MS`]. #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))] - pub async fn spin_lock(&self, max_backoff: Option) -> Result<(), CryptoStoreError> { + pub async fn spin_lock( + &self, + max_backoff: Option, + ) -> Result { let max_backoff = max_backoff.unwrap_or(Self::MAX_BACKOFF_MS); // Note: reads/writes to the backoff are racy across threads in theory, but the // lock in `try_lock_once` should sequentialize it all. loop { - if self.try_lock_once().await? { + if let Some(guard) = self.try_lock_once().await? { // Reset backoff before returning, for the next attempt to lock. *self.backoff.lock().await = WaitingTime::Some(Self::INITIAL_BACKOFF_MS); - return Ok(()); + return Ok(guard); } // Exponential backoff! Multiply by 2 the time we've waited before, cap it to @@ -183,45 +297,6 @@ impl CryptoStoreLock { sleep(Duration::from_millis(wait.into())).await; } } - - /// Release the lock taken previously with [`Self::try_lock_once()`]. - /// - /// Will return an error if the lock wasn't taken. - #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))] - pub async fn unlock(&self) -> Result<(), CryptoStoreError> { - // Keep the lock for the whole's function lifetime, to avoid races with other - // threads trying to acquire/release the lock at the same time. - let mut holders = self.num_holders.lock().await; - - if *holders > 1 { - // There's at least one other holder, so just decrease the number of holders. - *holders -= 1; - trace!("not releasing, because another thread holds onto it"); - return Ok(()); - } - - // Here, holders == 1 (or 0, but that is supposed to trigger the - // `MissingLockValue` error then). - - let read = self - .store - .get_custom_value(&self.lock_key) - .await? - .ok_or(CryptoStoreError::from(LockStoreError::MissingLockValue))?; - - if read != self.lock_holder.as_bytes() { - return Err(LockStoreError::IncorrectLockValue.into()); - } - - let removed = self.store.remove_custom_value(&self.lock_key).await?; - if removed { - *holders = 0; - trace!("successfully released"); - Ok(()) - } else { - Err(LockStoreError::MissingLockValue.into()) - } - } } /// Error related to the locking API of the crypto store. @@ -250,6 +325,11 @@ mod tests { use super::*; use crate::store::{IntoCryptoStore as _, MemoryStore}; + async fn release_lock(guard: Option) { + drop(guard); + sleep(Duration::from_millis(CryptoStoreLock::EXTEND_LEASE_EVERY_MS)).await; + } + #[async_test] async fn test_simple_lock_unlock() -> Result<(), CryptoStoreError> { let mem_store = MemoryStore::new(); @@ -259,30 +339,19 @@ mod tests { // The lock plain works when used with a single holder. let acquired = lock.try_lock_once().await?; - assert!(acquired); - assert_eq!(*lock.num_holders.lock().await, 1); + assert!(acquired.is_some()); + assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1); // Releasing works. - assert!(lock.unlock().await.is_ok()); - assert_eq!(*lock.num_holders.lock().await, 0); - - // Releasing another time is an error. - assert_matches!( - lock.unlock().await, - Err(CryptoStoreError::Lock(LockStoreError::MissingLockValue)) - ); + release_lock(acquired).await; + assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0); // Spin locking on the same lock always works, assuming no concurrent access. - assert!(lock.spin_lock(None).await.is_ok()); + let acquired = lock.spin_lock(None).await.unwrap(); - // Releasing works again. - assert!(lock.unlock().await.is_ok()); - - // Releasing another time is still an error. - assert_matches!( - lock.unlock().await, - Err(CryptoStoreError::Lock(LockStoreError::MissingLockValue)) - ); + // Releasing still works. + release_lock(Some(acquired)).await; + assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0); Ok(()) } @@ -296,26 +365,19 @@ mod tests { // When a lock is acquired... let acquired = lock.try_lock_once().await?; - assert!(acquired); - assert_eq!(*lock.num_holders.lock().await, 1); + assert!(acquired.is_some()); + assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1); - // But then forgotten... + // But then forgotten... (note: no need to release the guard) drop(lock); - // The DB still knows about it... - let prev = dyn_store.get_custom_value("key").await?; - assert_eq!(String::from_utf8_lossy(prev.as_ref().unwrap()), "first"); - // And when rematerializing the lock with the same key/value... let lock = CryptoStoreLock::new(dyn_store.clone(), "key".to_owned(), "first".to_owned()); // We still got it. let acquired = lock.try_lock_once().await?; - assert!(acquired); - assert_eq!(*lock.num_holders.lock().await, 1); - - // And can release it. - assert!(lock.unlock().await.is_ok()); + assert!(acquired.is_some()); + assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1); Ok(()) } @@ -329,23 +391,19 @@ mod tests { // Taking the lock twice... let acquired = lock.try_lock_once().await?; - assert!(acquired); + assert!(acquired.is_some()); - let acquired = lock.try_lock_once().await?; - assert!(acquired); + let acquired2 = lock.try_lock_once().await?; + assert!(acquired2.is_some()); - assert_eq!(*lock.num_holders.lock().await, 2); + assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 2); // ...means we can release it twice. - assert!(lock.unlock().await.is_ok()); + release_lock(acquired).await; + assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1); - assert!(lock.unlock().await.is_ok()); - - // Releasing another time is still an error. - assert_matches!( - lock.unlock().await, - Err(CryptoStoreError::Lock(LockStoreError::MissingLockValue)) - ); + release_lock(acquired2).await; + assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0); Ok(()) } @@ -360,21 +418,24 @@ mod tests { // When the first process takes the lock... let acquired1 = lock1.try_lock_once().await?; - assert!(acquired1); + assert!(acquired1.is_some()); // The second can't take it immediately. let acquired2 = lock2.try_lock_once().await?; - assert!(!acquired2); + assert!(acquired2.is_none()); let lock2_clone = lock2.clone(); let handle = spawn(async move { lock2_clone.spin_lock(Some(1000)).await }); sleep(Duration::from_millis(100)).await; - lock1.unlock().await?; + drop(acquired1); - // lock2 in the background managed to get the lock. - assert!(handle.await.is_ok()); + // lock2 in the background manages to get the lock at some point. + let _acquired2 = handle + .await + .expect("join handle is properly awaited") + .expect("lock was obtained after spin-locking"); // Now if lock1 tries to get the lock with a small timeout, it will fail. assert_matches!( diff --git a/crates/matrix-sdk-crypto/src/store/memorystore.rs b/crates/matrix-sdk-crypto/src/store/memorystore.rs index 49e83ec94f9..c9b1ccbfcf0 100644 --- a/crates/matrix-sdk-crypto/src/store/memorystore.rs +++ b/crates/matrix-sdk-crypto/src/store/memorystore.rs @@ -12,7 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashMap, convert::Infallible, sync::Arc}; +use std::{ + collections::HashMap, + convert::Infallible, + sync::Arc, + time::{Duration, Instant}, +}; use async_trait::async_trait; use dashmap::{DashMap, DashSet}; @@ -57,6 +62,7 @@ pub struct MemoryStore { key_requests_by_info: Arc>, direct_withheld_info: Arc>>, custom_values: Arc>>, + leases: Arc>, } impl Default for MemoryStore { @@ -71,6 +77,7 @@ impl Default for MemoryStore { key_requests_by_info: Default::default(), direct_withheld_info: Default::default(), custom_values: Default::default(), + leases: Default::default(), } } } @@ -326,6 +333,43 @@ impl CryptoStore for MemoryStore { let was_there = self.custom_values.remove(key).is_some(); Ok(was_there) } + + async fn try_take_leased_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Result { + let now = Instant::now(); + let expiration = now + Duration::from_millis(lease_duration_ms.into()); + if let Some(mut prev) = self.leases.get_mut(key) { + if prev.0 == holder { + // We had the lease before, extend it. + prev.1 = expiration; + Ok(true) + } else { + // We didn't have it. + if prev.1 < now { + // Steal it! + prev.0 = holder.to_owned(); + prev.1 = expiration; + Ok(true) + } else { + // We tried our best. + Ok(false) + } + } + } else { + self.leases.insert( + key.to_owned(), + ( + holder.to_owned(), + Instant::now() + Duration::from_millis(lease_duration_ms.into()), + ), + ); + Ok(true) + } + } } #[cfg(test)] diff --git a/crates/matrix-sdk-crypto/src/store/traits.rs b/crates/matrix-sdk-crypto/src/store/traits.rs index 5c7ad6c9cef..1ae40f34f5c 100644 --- a/crates/matrix-sdk-crypto/src/store/traits.rs +++ b/crates/matrix-sdk-crypto/src/store/traits.rs @@ -243,6 +243,24 @@ pub trait CryptoStore: AsyncTraitDeps { /// Returns a boolean indicating whether the value was actually present in /// the store. async fn remove_custom_value(&self, key: &str) -> Result; + + /// Try to take a leased lock. + /// + /// This attempts to take a lock for the given lease duration. + /// + /// - If we already had the lease, this will extend the lease. + /// - If we didn't, but the previous lease has expired, we will acquire the + /// lock. + /// - If there was no previous lease, we will acquire the lock. + /// - Otherwise, we don't get the lock. + /// + /// Returns whether taking the lock succeeded. + async fn try_take_leased_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Result; } #[repr(transparent)] @@ -401,6 +419,15 @@ impl CryptoStore for EraseCryptoStoreError { async fn remove_custom_value(&self, key: &str) -> Result { self.0.remove_custom_value(key).await.map_err(Into::into) } + + async fn try_take_leased_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Result { + self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into) + } } /// A type-erased [`CryptoStore`]. diff --git a/crates/matrix-sdk-indexeddb/src/crypto_store.rs b/crates/matrix-sdk-indexeddb/src/crypto_store.rs index 96c1e14e6d0..50540d02edc 100644 --- a/crates/matrix-sdk-indexeddb/src/crypto_store.rs +++ b/crates/matrix-sdk-indexeddb/src/crypto_store.rs @@ -34,7 +34,9 @@ use matrix_sdk_crypto::{ TrackedUser, }; use matrix_sdk_store_encryption::StoreCipher; -use ruma::{DeviceId, OwnedDeviceId, OwnedUserId, RoomId, TransactionId, UserId}; +use ruma::{ + DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, RoomId, TransactionId, UserId, +}; use serde::{de::DeserializeOwned, Serialize}; use tokio::sync::Mutex; use wasm_bindgen::JsValue; @@ -1090,6 +1092,47 @@ impl_crypto_store! { Ok(false) } } + + async fn try_take_leased_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Result { + // As of 2023-06-23, the code below hasn't been tested yet. + let key = JsValue::from_str(key); + let txn = self + .inner + .transaction_on_one_with_mode(keys::CORE, IdbTransactionMode::Readwrite)?; + let object_store = txn + .object_store(keys::CORE)?; + + #[derive(serde::Deserialize, serde::Serialize)] + struct Lease { + holder: String, + expiration_ts: u64, + } + + let now_ts: u64 = MilliSecondsSinceUnixEpoch::now().get().into(); + let expiration_ts = now_ts + lease_duration_ms as u64; + + let prev = object_store.get(&key)?.await?; + match prev { + Some(prev) => { + let lease: Lease = self.deserialize_value(prev)?; + if lease.holder == holder || lease.expiration_ts < now_ts { + object_store.put_key_val(&key, &self.serialize_value(&Lease { holder: holder.to_owned(), expiration_ts })?)?; + Ok(true) + } else { + Ok(false) + } + } + None => { + object_store.put_key_val(&key, &self.serialize_value(&Lease { holder: holder.to_owned(), expiration_ts })?)?; + Ok(true) + } + } + } } impl Drop for IndexeddbCryptoStore { diff --git a/crates/matrix-sdk-sqlite/migrations/crypto_store/007_lock_leases.sql b/crates/matrix-sdk-sqlite/migrations/crypto_store/007_lock_leases.sql new file mode 100644 index 00000000000..f29cdb113e3 --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/crypto_store/007_lock_leases.sql @@ -0,0 +1,5 @@ +CREATE TABLE "lease_locks" ( + "key" TEXT PRIMARY KEY NOT NULL, + "holder" TEXT NOT NULL, + "expiration_ts" REAL NOT NULL +); diff --git a/crates/matrix-sdk-sqlite/src/crypto_store.rs b/crates/matrix-sdk-sqlite/src/crypto_store.rs index 214b68d95fb..112c5178df7 100644 --- a/crates/matrix-sdk-sqlite/src/crypto_store.rs +++ b/crates/matrix-sdk-sqlite/src/crypto_store.rs @@ -33,7 +33,9 @@ use matrix_sdk_crypto::{ TrackedUser, }; use matrix_sdk_store_encryption::StoreCipher; -use ruma::{DeviceId, OwnedDeviceId, OwnedUserId, RoomId, TransactionId, UserId}; +use ruma::{ + DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, RoomId, TransactionId, UserId, +}; use rusqlite::OptionalExtension; use serde::{de::DeserializeOwned, Serialize}; use tokio::{fs, sync::Mutex}; @@ -193,7 +195,7 @@ impl SqliteCryptoStore { } } -const DATABASE_VERSION: u8 = 6; +const DATABASE_VERSION: u8 = 7; /// Run migrations for the given version of the database. async fn run_migrations(conn: &SqliteConn, version: u8) -> Result<()> { @@ -254,6 +256,13 @@ async fn run_migrations(conn: &SqliteConn, version: u8) -> Result<()> { .await?; } + if version < 7 { + conn.with_transaction(|txn| { + txn.execute_batch(include_str!("../migrations/crypto_store/007_lock_leases.sql")) + }) + .await?; + } + conn.set_kv("version", vec![DATABASE_VERSION]).await?; Ok(()) @@ -1145,11 +1154,44 @@ impl CryptoStore for SqliteCryptoStore { Ok(num_touched == 1) } + + async fn try_take_leased_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Result { + let key = key.to_owned(); + let holder = holder.to_owned(); + + let now_ts: u64 = MilliSecondsSinceUnixEpoch::now().get().into(); + let expiration_ts = now_ts + lease_duration_ms as u64; + + let num_touched = self + .acquire() + .await? + .with_transaction(move |txn| { + txn.execute( + "INSERT INTO lease_locks (key, holder, expiration_ts) + VALUES (?1, ?2, ?3) + ON CONFLICT (key) + DO + UPDATE SET holder = ?2, expiration_ts = ?3 + WHERE holder = ?2 + OR expiration_ts < ?4 + ", + (key, holder, expiration_ts, now_ts), + ) + }) + .await?; + + Ok(num_touched == 1) + } } #[cfg(test)] mod tests { - use matrix_sdk_crypto::cryptostore_integration_tests; + use matrix_sdk_crypto::{cryptostore_integration_tests, cryptostore_integration_tests_time}; use once_cell::sync::Lazy; use tempfile::{tempdir, TempDir}; @@ -1166,11 +1208,12 @@ mod tests { } cryptostore_integration_tests!(); + cryptostore_integration_tests_time!(); } #[cfg(test)] mod encrypted_tests { - use matrix_sdk_crypto::cryptostore_integration_tests; + use matrix_sdk_crypto::{cryptostore_integration_tests, cryptostore_integration_tests_time}; use once_cell::sync::Lazy; use tempfile::{tempdir, TempDir}; @@ -1188,4 +1231,5 @@ mod encrypted_tests { } cryptostore_integration_tests!(); + cryptostore_integration_tests_time!(); } diff --git a/crates/matrix-sdk-ui/src/encryption_sync/mod.rs b/crates/matrix-sdk-ui/src/encryption_sync/mod.rs index 7d6c6b8e275..fce89a3eefd 100644 --- a/crates/matrix-sdk-ui/src/encryption_sync/mod.rs +++ b/crates/matrix-sdk-ui/src/encryption_sync/mod.rs @@ -26,12 +26,13 @@ //! //! [NSE]: https://developer.apple.com/documentation/usernotifications/unnotificationserviceextension -use std::{ops::Not as _, time::Duration}; +use std::time::Duration; use async_stream::stream; use futures_core::stream::Stream; use futures_util::{pin_mut, StreamExt}; use matrix_sdk::{Client, SlidingSync}; +use matrix_sdk_crypto::store::locks::CryptoStoreLock; use ruma::{api::client::sync::sync_events::v4, assign}; use tracing::{error, trace}; @@ -110,7 +111,7 @@ impl EncryptionSync { let mut mode = self.mode; loop { - match &mut mode { + let guard = match &mut mode { EncryptionSyncMode::RunFixedIterations(ref mut val) => { if *val == 0 { // The previous attempt was the last one, stop now. @@ -119,31 +120,49 @@ impl EncryptionSync { // Soon. *val -= 1; - if self + let mut guard = self .client .encryption() .try_lock_store_once() .await - .map_err(Error::LockError)? - .not() - { + .map_err(Error::LockError)?; + + if guard.is_none() { // If we can't acquire the cross-process lock on the first attempt, - // that means the main process is running. Don't even try to sync, in - // that case. + // that means the main process is running, or its lease hasn't expired + // yet. In case it's the latter, wait a bit and retry. tracing::debug!( - "Lock was already taken, and we're not the main loop; aborting." + "Lock was already taken, and we're not the main loop; retrying in {}ms...", + CryptoStoreLock::LEASE_DURATION_MS ); - return; + + tokio::time::sleep(Duration::from_millis( + CryptoStoreLock::LEASE_DURATION_MS.into(), + )) + .await; + + guard = self + .client + .encryption() + .try_lock_store_once() + .await + .map_err(Error::LockError)?; + + if guard.is_none() { + tracing::debug!("Second attempt at locking outside the main app failed, so aborting."); + return; + } } - } - EncryptionSyncMode::NeverStop => { - self.client - .encryption() - .spin_lock_store(Some(60000)) - .await - .map_err(Error::LockError)?; + guard } + + EncryptionSyncMode::NeverStop => self + .client + .encryption() + .spin_lock_store(Some(60000)) + .await + .map_err(Error::LockError)?, }; match sync.next().await { @@ -157,25 +176,25 @@ impl EncryptionSync { error!(?update_summary.rooms, "unexpected non-empty list of rooms in encryption sync API"); } - self.client.encryption().unlock_store().await.map_err(Error::LockError)?; - // Cool cool, let's do it again. trace!("Encryption sync received an update!"); + + drop(guard); + yield Ok(()); continue; } Some(Err(err)) => { - self.client.encryption().unlock_store().await.map_err(Error::LockError)?; - trace!("Encryption sync stopped because of an error: {err:#}"); + + drop(guard); + yield Err(Error::SlidingSync(err)); break; } None => { - self.client.encryption().unlock_store().await.map_err(Error::LockError)?; - trace!("Encryption sync properly terminated."); break; } diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs index b635a991e5d..c97648c47ac 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs @@ -185,8 +185,8 @@ async fn retry_order() { assert_eq!(value.content().as_message().unwrap().body(), "First!"); }); - // Wait 200ms for the first msg, 100ms for the second, 200ms for overhead - sleep(Duration::from_millis(500)).await; + // Wait 200ms for the first msg, 100ms for the second, 300ms for overhead + sleep(Duration::from_millis(600)).await; // The second item should be updated first, since it was retried first assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => { diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index 13a3c0108e5..e26cfa685ac 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -20,7 +20,6 @@ use std::{ collections::{BTreeMap, HashSet}, io::{Cursor, Read, Write}, iter, - ops::Not as _, path::PathBuf, }; @@ -29,7 +28,10 @@ use futures_util::{ future::try_join, stream::{self, StreamExt}, }; -use matrix_sdk_base::crypto::{OlmMachine, OutgoingRequest, RoomMessageRequest, ToDeviceRequest}; +use matrix_sdk_base::crypto::{ + store::locks::CryptoStoreLockGuard, OlmMachine, OutgoingRequest, RoomMessageRequest, + ToDeviceRequest, +}; use ruma::{ api::client::{ backup::add_backup_keys::v3::Response as KeysBackupResponse, @@ -876,46 +878,40 @@ impl Encryption { /// /// May reload the `OlmMachine`, after obtaining the lock but not on the /// first time. - pub async fn spin_lock_store(&self, max_backoff: Option) -> Result<(), Error> { + pub async fn spin_lock_store( + &self, + max_backoff: Option, + ) -> Result, Error> { if let Some(lock) = self.client.inner.cross_process_crypto_store_lock.get() { - if lock.try_lock_once().await?.not() { - // We didn't get the lock on the first attempt, so that means that another - // process is using it. Wait for it to release it. - lock.spin_lock(max_backoff).await?; - - // As we didn't get the lock on the first attempt, force-reload all the crypto - // state caches at once, by recreating the OlmMachine from scratch. - if let Err(err) = self.client.base_client().regenerate_olm().await { - // First, give back the cross-process lock. - lock.unlock().await?; - // Then return the error to the caller. - return Err(err.into()); - }; + match lock.try_lock_once().await? { + Some(guard) => Ok(Some(guard)), + None => { + // We didn't get the lock on the first attempt, so that means that another + // process is using it. Wait for it to release it. + let guard = lock.spin_lock(max_backoff).await?; + + // As we didn't get the lock on the first attempt, force-reload all the crypto + // state caches at once, by recreating the OlmMachine from scratch. + self.client.base_client().regenerate_olm().await?; + + Ok(Some(guard)) + } } + } else { + Ok(None) } - Ok(()) } /// If a lock was created with [`Self::enable_cross_process_store_lock`], /// attempts to lock it once. /// - /// Returns whether the lock was obtained or not. - pub async fn try_lock_store_once(&self) -> Result { + /// Returns a guard to the lock, if it was obtained. + pub async fn try_lock_store_once(&self) -> Result, Error> { if let Some(lock) = self.client.inner.cross_process_crypto_store_lock.get() { - return Ok(lock.try_lock_once().await?); - } - Ok(false) - } - - /// If a lock was created with [`Self::enable_cross_process_store_lock`], - /// unlocks it. - /// - /// This may return an error if we were not the lock's owner. - pub async fn unlock_store(&self) -> Result<(), Error> { - if let Some(lock) = self.client.inner.cross_process_crypto_store_lock.get() { - lock.unlock().await?; + Ok(lock.try_lock_once().await?) + } else { + Ok(None) } - Ok(()) } /// Manually request that the internal crypto caches be reloaded. diff --git a/crates/matrix-sdk/src/room/joined/mod.rs b/crates/matrix-sdk/src/room/joined/mod.rs index c3e32e43e47..bcc01ec5338 100644 --- a/crates/matrix-sdk/src/room/joined/mod.rs +++ b/crates/matrix-sdk/src/room/joined/mod.rs @@ -412,13 +412,9 @@ impl Joined { }; // Take and release the lock on the store, if needs be. - self.client.encryption().spin_lock_store(Some(60000)).await?; + let _guard = self.client.encryption().spin_lock_store(Some(60000)).await?; - let result = inner().await; - - self.client.encryption().unlock_store().await?; - - result + inner().await } /// Share a group session for a room.