Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use correct index for the conclusion feed. #561

Merged
merged 6 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions event-svc/src/store/sql/access/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use ceramic_core::{event_id::InvalidEventId, Cid, EventId, NodeId};
use ceramic_event::unvalidated;
use ceramic_sql::sqlite::{SqlitePool, SqliteTransaction};
use ipld_core::ipld::Ipld;
use itertools::Itertools;
use recon::{AssociativeHash, HashCount, Key, Sha256a};

use crate::store::{
Expand Down Expand Up @@ -370,17 +371,30 @@ impl CeramicOneEvent {
all_blocks.iter().map(|row| row.block.clone()).collect(),
)
.await?;

// We need to match up the delivered index with each event. However all_blocks contains an
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are keeping multiple copies of data in memory, can we move this logic in the SQL query itself?

    let max_highwater = sqlx::query_as::<_, DeliveredEventBlockRow>(
        r#"
        SELECT
            event_id,
            MAX(delivered) AS max_delivered,
            block_data
        FROM
            events_table
        WHERE
            delivered > ?  -- Only fetch events with a delivered value greater than the highwater mark
        GROUP BY
            event_id
        ORDER BY
            max_delivered ASC
        LIMIT ?
        "#,
    ) 
    ```
    What do you think? Is this a better approach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can call,

.bind(highwater)
.bind(limit)
.fetch_all(pool.reader())
.await?
.into_iter()
.map(|row| {
        // Decode the CAR file from the block data
     ....

        // Add the parsed event to the list
     ....

      // Update the max highwater mark
    })
    .max()
    .unwrap_or(highwater); // Use the input highwater if no new events are found ```

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The challenge is that we need events, but we have to construct the events from the list of blocks that make up each event. So the all_block query needs to join with the blocks tables in order to get all the data. This PR doesn't change how much memory is needed (we were previously keeping a copy of the data in memory). I think optimizing this query should be future work.(If we find its a cause of slow down)

// item for each block within each event. We need to chunk all_blocks by event and then
// find the max delivered for each event. This will create an iterator of a single
// delivered value for each event. With that iterator we can zip it with the parsed block
// car files as there is a 1:1 mapping.
let event_chunks = all_blocks
.into_iter()
.chunk_by(|block| block.block.order_key.clone());
let delivered_iter = event_chunks
.into_iter()
.map(|(_, event_chunk)| event_chunk.map(|block| block.delivered).max());

let result: Result<Vec<EventRowDelivered>> = parsed
.into_iter()
.zip(all_blocks.iter())
.map(|((_, carfile), block)| {
.zip(delivered_iter)
.map(|((_, carfile), delivered)| {
let (cid, event) =
unvalidated::Event::<Ipld>::decode_car(carfile.as_slice(), false)
.map_err(|_| Error::new_fatal(anyhow!("Error parsing event row")))?;
Ok(EventRowDelivered {
cid,
event,
delivered: block.delivered,
delivered: delivered.expect("should always be one block per event"),
})
})
.collect();
Expand Down
14 changes: 7 additions & 7 deletions event-svc/src/tests/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,13 +637,13 @@ async fn test_conclusion_events_since() -> Result<(), Box<dyn std::error::Error>
+-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+
| 0 | Data | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 6e756c6c | [] |
+-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+
| 2 | Data | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerap4iyp25kvufzcesbi4ijfseeb4koayw2m2y2wtk3pkm3nb2iezaq | 7b2273747265616d32223a22646174615f31227d | [Cid(bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i)] |
| 3 | Data | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerap4iyp25kvufzcesbi4ijfseeb4koayw2m2y2wtk3pkm3nb2iezaq | 7b2273747265616d32223a22646174615f31227d | [Cid(bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i)] |
+-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+
| 3 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq | 7b2273747265616d5f31223a22646174615f31227d | [Cid(bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q)] |
| 4 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq | 7b2273747265616d5f31223a22646174615f31227d | [Cid(bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q)] |
+-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+
| 3 | Time | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bafyreiftj6l432kco7hnb6reklbd7bh2j4jbg5beuvtxp3rhgny7omgali | | [Cid(bagcqcerap4iyp25kvufzcesbi4ijfseeb4koayw2m2y2wtk3pkm3nb2iezaq)] |
| 5 | Time | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bafyreiftj6l432kco7hnb6reklbd7bh2j4jbg5beuvtxp3rhgny7omgali | | [Cid(bagcqcerap4iyp25kvufzcesbi4ijfseeb4koayw2m2y2wtk3pkm3nb2iezaq)] |
+-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+
| 4 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqceraccgbaicjznz45ov4wgc3wnr62zqwba24sxzreqerlzdklidysfdq | 7b2273747265616d5f31223a22646174615f32227d | [Cid(bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq)] |
| 6 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqceraccgbaicjznz45ov4wgc3wnr62zqwba24sxzreqerlzdklidysfdq | 7b2273747265616d5f31223a22646174615f32227d | [Cid(bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq)] |
+-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+
"#]].assert_eq(&events_to_table(&conclusion_events));

Expand All @@ -656,11 +656,11 @@ async fn test_conclusion_events_since() -> Result<(), Box<dyn std::error::Error>
+-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+
| 3 | Data | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerap4iyp25kvufzcesbi4ijfseeb4koayw2m2y2wtk3pkm3nb2iezaq | 7b2273747265616d32223a22646174615f31227d | [Cid(bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i)] |
+-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+
| 3 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq | 7b2273747265616d5f31223a22646174615f31227d | [Cid(bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q)] |
| 4 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq | 7b2273747265616d5f31223a22646174615f31227d | [Cid(bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q)] |
+-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+
| 4 | Time | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bafyreiftj6l432kco7hnb6reklbd7bh2j4jbg5beuvtxp3rhgny7omgali | | [Cid(bagcqcerap4iyp25kvufzcesbi4ijfseeb4koayw2m2y2wtk3pkm3nb2iezaq)] |
| 5 | Time | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bafyreiftj6l432kco7hnb6reklbd7bh2j4jbg5beuvtxp3rhgny7omgali | | [Cid(bagcqcerap4iyp25kvufzcesbi4ijfseeb4koayw2m2y2wtk3pkm3nb2iezaq)] |
+-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+
| 4 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqceraccgbaicjznz45ov4wgc3wnr62zqwba24sxzreqerlzdklidysfdq | 7b2273747265616d5f31223a22646174615f32227d | [Cid(bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq)] |
| 6 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqceraccgbaicjznz45ov4wgc3wnr62zqwba24sxzreqerlzdklidysfdq | 7b2273747265616d5f31223a22646174615f32227d | [Cid(bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq)] |
+-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------+----------------------------------------------------------------------+
"#]].assert_eq(&events_to_table(&conclusion_events));

Expand Down