-
Notifications
You must be signed in to change notification settings - Fork 260
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Introduce CryptoStore::get_inbound_group_sessions_stream
#3060
Conversation
This patch implements 2 new types: `StreamByCursor` and `StreamByRenewedCursor` to iterate over large collections of values from an IndexedDB storage. See the types' documentation to learn more.
b47a4a1
to
641afac
Compare
matrix_sdk_crypto::CryptoStore::get_inbound_group_sessions_stream
matrix_sdk_crypto::CryptoStore::get_inbound_group_sessions_stream
CryptoStore::get_inbound_group_sessions_stream
46047b6
to
49f0cb5
Compare
…ait. This patch implements the `get_inbound_group_sessions_stream` method onto the `CryptoStore` trait. In order for the trait to continue being object-safe, this patch implements a `StreamOf` struct that simply wraps any `T` where `T: Stream`. `StreamOf` also implements `Stream` and forwards everything to its inner stream. This patch finally updates the test suite of `matrix-sdk-crypto` to test this new `get_inbound_group_sessions_stream` method.
49f0cb5
to
36775c8
Compare
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #3060 +/- ##
==========================================
- Coverage 83.72% 83.68% -0.04%
==========================================
Files 222 222
Lines 23357 23380 +23
==========================================
+ Hits 19555 19565 +10
- Misses 3802 3815 +13 ☔ View full report in Codecov by Sentry. |
36775c8
to
864b01d
Compare
864b01d
to
6321d30
Compare
use wasm_bindgen::JsValue; | ||
use web_sys::{DomException, IdbKeyRange}; | ||
|
||
pin_project! { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I read a bit about pin_project
and I think that because we are not using #[pin]
for any of our struct's fields, that we are asserting that we are not using structural pinning, which means that all the fields can be accessed as &mut Type
instead of Pin<&mut Type>
.
If I'm right, please could you try to explain to me why this is ok? And I think a comment explaining this would be helpful in the code.
cursor.value(), | ||
)))) | ||
} else { | ||
// It doesn't have a new value. End of the cursor. End of the stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if continue_cursor
returned Poll::Pending
? Why do we return Poll::Ready
? Should we return Poll::Pending
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, no, I see that the ready macro returns Poll::Pending
for us automatically.
Please could you add a comment somewhere explaining that, because it fooled me...
/// Why a range? What is renewed? Well, that's kind of complex. An | ||
/// [`IdbTransaction`] is dropped by the runtime (most of the time, the | ||
/// JavaScript runtime) when the transaction is considered “idle”. It's hard to | ||
/// estimate when a transaction will be dropped, and it can happen in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it's well-defined when a transaction is dropped - it's when you await
anything (in JS), or at least that is how I interpret this:
Transactions are tied very closely to the event loop. If you make a transaction and return to the event loop without using it then the transaction will become inactive. The only way to keep the transaction active is to make a request on it. When the request is finished you'll get a DOM event and, assuming that the request succeeded, you'll have another opportunity to extend the transaction during that callback. If you return to the event loop without extending the transaction then it will become inactive, and so on. As long as there are pending requests the transaction remains active.
The reason we need StreamByRenewedCursor
is because, to avoid memory exhaustion, we need to write part of our export to disk before we have walked all records. Because writing to disk is async, we lose our transaction and need to open a new one.
/// estimate when a transaction will be dropped, and it can happen in | ||
/// circumstances out of control. One solution to this is to _renew_ the | ||
/// transaction every time _x_ values are fetched from it. The lower _x_ is, the | ||
/// higher the probability for the transaction to not be dropped is. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, based on my above comment, this is not about making dropping the transaction unlikely because less time passed: it's about having well-defined break points where we can stop streaming, write to disk, and then stream again.
|
||
// The latest known valid key. | ||
// This value changes every time a valid value is polled. It's helpful to reposition the cursor | ||
// when an [`IdbTransaction`] is renwed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renwed -> renewed
where | ||
F: FnMut() -> Result<IdbTransaction<'a>, DomException>, | ||
{ | ||
/// Build a new `StreamByRenewdCursor`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamByRenewdCursor -> StreamByRenewedCursor
}; | ||
let mut this = Box::pin(this); | ||
|
||
unsafe { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have safety docs here please?
.latest_object_store_ptr | ||
.as_ref() | ||
.as_ref() | ||
.unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth a comment that unwrap
is safe because we just created this thing and made a pointer to it?
let transaction = (this.transaction_builder)()?; | ||
this.latest_transaction.set(transaction); | ||
|
||
// Get and asve the new `object_store`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asve -> save
this.latest_transaction.set(transaction); | ||
|
||
// Get and asve the new `object_store`. | ||
let object_store = unsafe { this.latest_transaction_ptr.as_ref() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Safety doc please!
this.latest_object_store.set(Some(object_store)); | ||
|
||
// Get a new `cursor_future`. | ||
let object_store_ref = unsafe { this.latest_object_store_ptr.as_ref() }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Safety doc please!
let object_store_ref = unsafe { this.latest_object_store_ptr.as_ref() }; | ||
let after_latest_key = IdbKeyRange::lower_bound_with_open(&this.latest_key, true)?; | ||
let cursor_future = | ||
object_store_ref.as_ref().unwrap().open_cursor_with_range(&after_latest_key)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a comment here about why unwrap
is ok?
let store = transaction.object_store("baz")?; | ||
|
||
let value: Option<JsValue> = store.get_owned("key3")?.await?; | ||
assert_eq!(value.expect("value is none").as_f64().expect("boo"), 3.); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expect("boo")
-> unwrap()
? ;-)
transaction.await.into_result()?; | ||
} | ||
|
||
// Test one read. Nothing particular here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we doing this part?
transaction.await.into_result()?; | ||
} | ||
|
||
// Test one read. Nothing particular here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, probably not needed?
|
||
let db = db.await?; | ||
|
||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be nice to put this into a helper function to shorten the test and make it easier to understand. Same for the similar one in the later test.
Ok(db.transaction_on_one("baz")?) | ||
}, | ||
"baz".to_owned(), | ||
// SAFETY: `unwrap` is safe because 3 is not zero. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be OK without this comment since we are in tests.
); | ||
assert_eq!(stream.try_next().await?, None); | ||
|
||
assert_eq!(number_of_renews, 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like 2 more assertions on number_of_renews
: when it is zero, and when it is 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is totally amazing code, and I think it will do the job.
The question I have for you and @bnjbvr as maintainers is:
Do you want something this clever in your code base? We could achieve the same thing by making a get_inbound_group_sessions_batch
method that accepts a latest_key
argument, creates a transaction, fetches a batch, and returns it as a Vec
.
I also think if we implemented StreamByRenewedCursor
without StreamByCursor
this code could be a little simpler...
Up to you.
Meanwhile, I will try out an export function based on this and see whether it works.
My proposal: first off, let's see if this works as expected (it works in Rust+Wasm, but let's see in real world!), and then let's discuss if we can simplify things a little bit maybe. |
I've just run some tests of memory usage in Chromium with the code before this change, with #3143, and with #3143+this code. (#3143 avoids one copy by streaming directly into a JSON string, without instantiating the full list of
This implies to me that the streaming of results out of Indexed DB is not gaining us much (presumably it's not actually using more memory, and the difference here is just noise). (Test setup: 2 users with 1 room containing 10001 messages, where discardsession was run between each message, meaning there are 10001 entries in the These numbers were got by running export (in Element Web UI) 3 times and taking the average, with this code in matrix-react-sdk:
Raw results:
|
Furthermore, I am increasingly uncomfortable with this approach. We are presenting an interface that looks like a normal So I think that it would be better to present an interface that covers this complexity, by explicitly returning a batch of The downside of my alternative approach is that we have to present an interface on all platforms that is not really needed on any platforms other than Indexed DB. (However, I can imagine other platforms also preferring short-lived transactions, so maybe this interface is not completely wrong-headed.) I am imagining something like this: async fn get_inbound_group_sessions_batched(&self, batch_size: usize) -> Result<StreamOf<store::Result<Vec<InboundGroupSession>>>> (Or even something cleverer with const generics) |
So I think for now my memory-usage results suggest it's not worth following this approach for now, and if/when we decide to revisit it, I would like to advocate for explicit batches. Let's wait to hear from @ara4n on whether element-hq/element-web#26681 is fixed for him with the changes that are already in. |
Actually, thinking further, I think we could actually do batches without changing the external interface. Within |
I'm OK with the plan. Let's see if things are improved/fixed for gigantic account. If not, let's revamp this PR a bit as suggested. |
If/when we need to do a fully scalable version of |
In some non-rare cases, the amount of inbound group sessions per user can be really high. Indeed, 500'000, 1'000'000 or even more sessions aren't rare. The
matrix-sdk-crypto
comes with 2 backends:matrix-sdk-sqlite
andmatrix-sdk-indexeddb
. The former handles this number of sessions pretty well. The latter is designed for thewasm32-unknown-unknown
target, uses IndexedDB, and has some struggles handling such an amount of sessions.The problem is that collecting 1 million sessions in memory, in a single
Vec
, that will then be converted to 1 million JS objects, is surprisingly not memory-efficient nor CPU-efficient. Despite the recent efforts (#3012 and #3013) to solve the issue by using theIdbCursor
API instead ofIdbObjectStore::getAll
, this is still not solved as all values are being put in-memory at the end.This patch tries a different approach:
matrix-sdk-indexeddb
:StreamByCursor
, to fetch data from an object store by using a cursor, but as an async iterator: aStream
,StreamByRenewedCursor
, same asStreamByCursor
, but the transaction that provides the object store that provides the cursor is renewed every n items fetched from the database. The idea is to avoid theIdbTransaction
to be considered idle and then being dropped prematurely by the JavaScript host. See its own documentation to learn more.CryptoStore
trait gains a new method:get_inbound_group_sessions_stream
:matrix-sdk-sqlite
, the implementation isn't ideal at all, but it's not likely to be used today,matrix-sdk-indexeddb
, the implementation is acceptable and aims at being used today .matrix-sdk-crypto
test suite has been updated accordingly.Ideally, one wants to clean up the work introduced in #3012 and #3013 if the need arises. Also, one wants to deprecate
CryptoStore::get_inbound_group_sessions
oncematrix-sdk-sqlite
has a proper implementation for the…_stream
implementation.The PR should ideally be reviewed commit-by-commit.