From 17f25640cb2495e0223e5844e3c98f52a7aa901c Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Thu, 24 Oct 2024 12:08:26 -0400 Subject: [PATCH] fix(katana-rpc): event idx not respecting cursor's block idx (#2578) --- crates/katana/rpc/rpc/src/utils/events.rs | 50 +++++++++++++++++------ 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/crates/katana/rpc/rpc/src/utils/events.rs b/crates/katana/rpc/rpc/src/utils/events.rs index 2df9ef2f03..b761af08f2 100644 --- a/crates/katana/rpc/rpc/src/utils/events.rs +++ b/crates/katana/rpc/rpc/src/utils/events.rs @@ -70,7 +70,7 @@ impl Cursor { } } -/// A partial cursor that points to a specific event within a transaction. +/// A partial cursor that points to a specific event WITHIN a transaction. #[derive(Debug, Clone, PartialEq, Default)] struct PartialCursor { /// The transaction index within a block. @@ -153,12 +153,11 @@ pub fn fetch_events_at_blocks( let block_range = cursor.block..=*block_range.end(); for block_num in block_range { + // collect all receipts at `block_num` block. let block_hash = provider.block_hash_by_num(block_num)?.context("Missing block hash")?; let receipts = provider.receipts_by_block(block_num.into())?.context("Missing receipts")?; - let body_index = provider.block_body_indices(block_num.into())?.context("Missing block body index")?; - let tx_hashes = provider.transaction_hashes_in_range(body_index.into())?; if block_num == cursor.block { @@ -178,16 +177,26 @@ pub fn fetch_events_at_blocks( .enumerate() .skip(total_tx_to_skip) { - // we should only skip for the last txn pointed by the cursor. + // Determine the next event index to start processing. + let next_event = + // Check if the block AND tx we're currently processing is exactly the one pointed by the cursor. + // + // If yes, then we check whether (1) the event index pointed by the cursor is less than + // OR (2) exceed the total number of events in the current transaction. if block_num == cursor.block && tx_idx == cursor.txn.idx { - match events.len().cmp(&cursor.txn.event) { - Ordering::Greater => {} - Ordering::Less | Ordering::Equal => continue, + // If its (1), then that means there are still some events left to process in + // the current transaction. Else if its (2), meaning the cursor is pointing to either the + // last event or out of bound, which we can just skip to the next transaction. + match cursor.txn.event.cmp(&events.len()) { + Ordering::Less => cursor.txn.event, + Ordering::Greater | Ordering::Equal => continue, } } + // If we're not processing the block and tx pointed by the cursor, then we start from 0 + else { + 0 + }; - // we should only skip for the last txn pointed by the cursor. - let next_event = if tx_idx == cursor.txn.idx { cursor.txn.event } else { 0 }; let partial_cursor = fetch_tx_events( next_event, Some(block_num), @@ -267,8 +276,23 @@ impl<'a, I: Iterator> Iterator for FilteredEvents<'a, I> { } } -// returns a cursor if it couldn't include all the events of the current transaction because -// the buffer is already full. otherwise none. +/// Fetches events from a transaction, applying filters and respecting chunk size limits. +/// +/// Returns a cursor if it couldn't include all the events of the current transaction because +/// the buffer is already full. Otherwise, if it is able to include all the transactions, +/// returns None. +/// +/// # Arguments +/// +/// * `next_event_idx` - The index of the transaction in the current transaction to start from +/// * `block_number` - Block number of the current transaction +/// * `block_hash` - Block hash of the current transaction +/// * `tx_idx` - Index of the current transaction in the block +/// * `tx_hash` - Hash of the current transaction +/// * `events` - All events in the current transaction +/// * `filter` - The filter to apply on the events +/// * `chunk_size` - Maximum number of events that can be taken, based on user-specified chunk size +/// * `buffer` - Buffer to store the matched events #[allow(clippy::too_many_arguments)] fn fetch_tx_events( next_event_idx: usize, @@ -295,6 +319,7 @@ fn fetch_tx_events( transaction_hash: tx_hash, from_address: e.from_address.into(), }) + // enumerate so that we can keep track of the event's index in the transaction .enumerate() .skip(next_event_idx) .take(total_can_take) @@ -304,10 +329,9 @@ fn fetch_tx_events( let total_events_traversed = next_event_idx + total_can_take; // get the index of the last matching event that we have reached. if there is not - // matching events (ie `filtered` is empty) we point the end of the chunk + // matching events (ie `filtered` is empty) we point to the end of the chunk // we've covered thus far using the iterator.. let last_event_idx = filtered.last().map(|(idx, _)| *idx).unwrap_or(total_events_traversed); - buffer.extend(filtered.into_iter().map(|(_, event)| event)); if buffer.len() >= chunk_size {