Skip to content

Commit

Permalink
fix(indexeddb): Fix broken raw redacted state events
Browse files Browse the repository at this point in the history
A fix for (de)serialization of redacted state events in Ruma made
old events undeserializable.
Bump the DB version.

Signed-off-by: Kévin Commaille <zecakeh@tedomum.fr>
  • Loading branch information
zecakeh authored and jplatte committed Feb 11, 2023
1 parent 5ae84a9 commit 9707d73
Showing 1 changed file with 140 additions and 21 deletions.
161 changes: 140 additions & 21 deletions crates/matrix-sdk-indexeddb/src/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use ruma::{
CanonicalJsonObject, EventId, MxcUri, OwnedEventId, OwnedUserId, RoomId, RoomVersionId, UserId,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue};
use tracing::{debug, warn};
use wasm_bindgen::JsValue;
use web_sys::IdbKeyRange;
Expand Down Expand Up @@ -109,7 +110,7 @@ impl From<IndexeddbStateStoreError> for StoreError {
mod KEYS {
// STORES

pub const CURRENT_DB_VERSION: f64 = 1.1;
pub const CURRENT_DB_VERSION: f64 = 1.2;
pub const CURRENT_META_DB_VERSION: f64 = 2.0;

pub const INTERNAL_STATE: &str = "matrix-sdk-state";
Expand Down Expand Up @@ -237,6 +238,74 @@ async fn backup(source: &IdbDatabase, meta: &IdbDatabase) -> Result<()> {
Ok(())
}

fn serialize_event(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
Ok(match store_cipher {
Some(cipher) => JsValue::from_serde(&cipher.encrypt_value_typed(event)?)?,
None => JsValue::from_serde(event)?,
})
}

fn deserialize_event<T: DeserializeOwned>(
store_cipher: Option<&StoreCipher>,
event: JsValue,
) -> Result<T> {
match store_cipher {
Some(cipher) => Ok(cipher.decrypt_value_typed(event.into_serde()?)?),
None => Ok(event.into_serde()?),
}
}

async fn v1_2_fix_store(
store: &IdbObjectStore<'_>,
store_cipher: Option<&StoreCipher>,
) -> Result<()> {
fn maybe_fix_json(raw_json: &RawJsonValue) -> Result<Option<JsonValue>> {
let json = raw_json.get();

if json.contains(r#""content":null"#) {
let mut value: JsonValue = serde_json::from_str(json)?;
if let Some(content) = value.get_mut("content") {
if matches!(content, JsonValue::Null) {
*content = JsonValue::Object(Default::default());
return Ok(Some(value));
}
}
}

Ok(None)
}

let cursor = store.open_cursor()?.await?;

if let Some(cursor) = cursor {
loop {
let raw_json: Box<RawJsonValue> = deserialize_event(store_cipher, cursor.value())?;

if let Some(fixed_json) = maybe_fix_json(&raw_json)? {
cursor.update(&serialize_event(store_cipher, &fixed_json)?)?.await?;
}

if !cursor.continue_cursor()?.await? {
break;
}
}
}

Ok(())
}

async fn migrate_to_v1_2(db: &IdbDatabase, store_cipher: Option<&StoreCipher>) -> Result<()> {
let tx = db.transaction_on_multi_with_mode(
&[KEYS::ROOM_STATE, KEYS::ROOM_INFOS],
IdbTransactionMode::Readwrite,
)?;

v1_2_fix_store(&tx.object_store(KEYS::ROOM_STATE)?, store_cipher).await?;
v1_2_fix_store(&tx.object_store(KEYS::ROOM_INFOS)?, store_cipher).await?;

tx.await.into_result().map_err(|e| e.into())
}

#[derive(Builder, Debug, PartialEq, Eq)]
#[builder(name = "IndexeddbStateStoreBuilder", build_fn(skip))]
pub struct IndexeddbStateStoreBuilderConfig {
Expand Down Expand Up @@ -311,7 +380,8 @@ impl IndexeddbStateStoreBuilder {
None
};

let recreate_stores = {
let mut recreate_stores = false;
{
// checkup up in a separate call, whether we have to backup or do anything else
// to the db. Unfortunately the set_on_upgrade_needed doesn't allow async fn
// which we need to execute the backup.
Expand All @@ -337,15 +407,16 @@ impl IndexeddbStateStoreBuilder {
let old_version = pre_db.version();

if created.load(Ordering::Relaxed) {
// this is a fresh DB, return
false
// this is a fresh DB, nothing to do
} else if old_version == 1.0 && has_store_cipher {
match migration_strategy {
MigrationConflictStrategy::BackupAndDrop => {
backup(&pre_db, &meta_db).await?;
true
recreate_stores = true;
}
MigrationConflictStrategy::Drop => {
recreate_stores = true;
}
MigrationConflictStrategy::Drop => true,
MigrationConflictStrategy::Raise => {
return Err(IndexeddbStateStoreError::MigrationConflict {
name,
Expand All @@ -354,11 +425,12 @@ impl IndexeddbStateStoreBuilder {
})
}
}
} else if old_version < 1.2 {
migrate_to_v1_2(&pre_db, store_cipher.as_deref()).await?;
} else {
// Nothing to be done
false
}
};
}

let mut db_req: OpenDbRequest = IdbDatabase::open_f64(&name, KEYS::CURRENT_DB_VERSION)?;
db_req.set_on_upgrade_needed(Some(
Expand Down Expand Up @@ -421,17 +493,11 @@ impl IndexeddbStateStore {
}

fn serialize_event(&self, event: &impl Serialize) -> Result<JsValue> {
Ok(match &self.store_cipher {
Some(cipher) => JsValue::from_serde(&cipher.encrypt_value_typed(event)?)?,
None => JsValue::from_serde(event)?,
})
serialize_event(self.store_cipher.as_deref(), event)
}

fn deserialize_event<T: DeserializeOwned>(&self, event: JsValue) -> Result<T> {
match &self.store_cipher {
Some(cipher) => Ok(cipher.decrypt_value_typed(event.into_serde()?)?),
None => Ok(event.into_serde()?),
}
deserialize_event(self.store_cipher.as_deref(), event)
}

fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
Expand Down Expand Up @@ -1451,15 +1517,21 @@ mod migration_tests {

use indexed_db_futures::prelude::*;
use matrix_sdk_test::async_test;
use ruma::{
events::{AnySyncStateEvent, StateEventType},
room_id,
};
use serde_json::json;
use uuid::Uuid;
use wasm_bindgen::JsValue;

use super::{
IndexeddbStateStore, IndexeddbStateStoreError, MigrationConflictStrategy, Result,
ALL_STORES,
serialize_event, IndexeddbStateStore, IndexeddbStateStoreError, MigrationConflictStrategy,
Result, ALL_STORES, KEYS,
};
use crate::safe_encode::SafeEncode;

pub async fn create_fake_db(name: &str, version: f64) -> Result<()> {
pub async fn create_fake_db(name: &str, version: f64) -> Result<IdbDatabase> {
let mut db_req: OpenDbRequest = IdbDatabase::open_f64(name, version)?;
db_req.set_on_upgrade_needed(Some(
move |evt: &IdbVersionChangeEvent| -> Result<(), JsValue> {
Expand All @@ -1471,8 +1543,7 @@ mod migration_tests {
Ok(())
},
));
db_req.into_future().await?;
Ok(())
db_req.into_future().await.map_err(Into::into)
}

#[async_test]
Expand Down Expand Up @@ -1565,4 +1636,52 @@ mod migration_tests {
}
Ok(())
}

#[async_test]
pub async fn test_migrating_to_v1_2() -> Result<()> {
let name = format!("migrating-1.2-{}", Uuid::new_v4().as_hyphenated().to_string());
// An event that fails to deserialize.
let wrong_redacted_state_event = json!({
"content": null,
"event_id": "$wrongevent",
"origin_server_ts": 1673887516047_u64,
"sender": "@example:localhost",
"state_key": "",
"type": "m.room.topic",
"unsigned": {
"redacted_because": {
"type": "m.room.redaction",
"sender": "@example:localhost",
"content": {},
"redacts": "$wrongevent",
"origin_server_ts": 1673893816047_u64,
"unsigned": {},
"event_id": "$redactionevent",
},
},
});
serde_json::from_value::<AnySyncStateEvent>(wrong_redacted_state_event.clone())
.unwrap_err();

let room_id = room_id!("!some_room:localhost");

// Populate DB with wrong event.
{
let db = create_fake_db(&name, 1.1).await?;
let tx =
db.transaction_on_one_with_mode(KEYS::ROOM_STATE, IdbTransactionMode::Readwrite)?;
let state = tx.object_store(KEYS::ROOM_STATE)?;
let key = (room_id, StateEventType::RoomTopic, "").encode();
state.put_key_val(&key, &serialize_event(None, &wrong_redacted_state_event)?)?;
tx.await.into_result()?;
}

// this transparently migrates to the latest version
let store = IndexeddbStateStore::builder().name(name).build().await?;
let event =
store.get_state_event(room_id, StateEventType::RoomTopic, "").await.unwrap().unwrap();
event.deserialize().unwrap();

Ok(())
}
}

0 comments on commit 9707d73

Please sign in to comment.