Skip to content

Commit

Permalink
fix(katana-rpc): event idx not respecting cursor's block idx (#2578)
Browse files Browse the repository at this point in the history
  • Loading branch information
kariy authored Oct 24, 2024
1 parent 615e9fd commit 17f2564
Showing 1 changed file with 37 additions and 13 deletions.
50 changes: 37 additions & 13 deletions crates/katana/rpc/rpc/src/utils/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Cursor {
}
}

/// A partial cursor that points to a specific event within a transaction.
/// A partial cursor that points to a specific event WITHIN a transaction.
#[derive(Debug, Clone, PartialEq, Default)]
struct PartialCursor {
/// The transaction index within a block.
Expand Down Expand Up @@ -153,12 +153,11 @@ pub fn fetch_events_at_blocks(
let block_range = cursor.block..=*block_range.end();

for block_num in block_range {
// collect all receipts at `block_num` block.
let block_hash = provider.block_hash_by_num(block_num)?.context("Missing block hash")?;
let receipts = provider.receipts_by_block(block_num.into())?.context("Missing receipts")?;

let body_index =
provider.block_body_indices(block_num.into())?.context("Missing block body index")?;

let tx_hashes = provider.transaction_hashes_in_range(body_index.into())?;

if block_num == cursor.block {
Expand All @@ -178,16 +177,26 @@ pub fn fetch_events_at_blocks(
.enumerate()
.skip(total_tx_to_skip)
{
// we should only skip for the last txn pointed by the cursor.
// Determine the next event index to start processing.
let next_event =
// Check if the block AND tx we're currently processing is exactly the one pointed by the cursor.
//
// If yes, then we check whether (1) the event index pointed by the cursor is less than
// OR (2) exceed the total number of events in the current transaction.
if block_num == cursor.block && tx_idx == cursor.txn.idx {
match events.len().cmp(&cursor.txn.event) {
Ordering::Greater => {}
Ordering::Less | Ordering::Equal => continue,
// If its (1), then that means there are still some events left to process in
// the current transaction. Else if its (2), meaning the cursor is pointing to either the
// last event or out of bound, which we can just skip to the next transaction.
match cursor.txn.event.cmp(&events.len()) {
Ordering::Less => cursor.txn.event,
Ordering::Greater | Ordering::Equal => continue,
}
}
// If we're not processing the block and tx pointed by the cursor, then we start from 0
else {
0
};

// we should only skip for the last txn pointed by the cursor.
let next_event = if tx_idx == cursor.txn.idx { cursor.txn.event } else { 0 };
let partial_cursor = fetch_tx_events(
next_event,
Some(block_num),
Expand Down Expand Up @@ -267,8 +276,23 @@ impl<'a, I: Iterator<Item = &'a Event>> Iterator for FilteredEvents<'a, I> {
}
}

// returns a cursor if it couldn't include all the events of the current transaction because
// the buffer is already full. otherwise none.
/// Fetches events from a transaction, applying filters and respecting chunk size limits.
///
/// Returns a cursor if it couldn't include all the events of the current transaction because
/// the buffer is already full. Otherwise, if it is able to include all the transactions,
/// returns None.
///
/// # Arguments
///
/// * `next_event_idx` - The index of the transaction in the current transaction to start from
/// * `block_number` - Block number of the current transaction
/// * `block_hash` - Block hash of the current transaction
/// * `tx_idx` - Index of the current transaction in the block
/// * `tx_hash` - Hash of the current transaction
/// * `events` - All events in the current transaction
/// * `filter` - The filter to apply on the events
/// * `chunk_size` - Maximum number of events that can be taken, based on user-specified chunk size
/// * `buffer` - Buffer to store the matched events
#[allow(clippy::too_many_arguments)]
fn fetch_tx_events(
next_event_idx: usize,
Expand All @@ -295,6 +319,7 @@ fn fetch_tx_events(
transaction_hash: tx_hash,
from_address: e.from_address.into(),
})
// enumerate so that we can keep track of the event's index in the transaction
.enumerate()
.skip(next_event_idx)
.take(total_can_take)
Expand All @@ -304,10 +329,9 @@ fn fetch_tx_events(
let total_events_traversed = next_event_idx + total_can_take;

// get the index of the last matching event that we have reached. if there is not
// matching events (ie `filtered` is empty) we point the end of the chunk
// matching events (ie `filtered` is empty) we point to the end of the chunk
// we've covered thus far using the iterator..
let last_event_idx = filtered.last().map(|(idx, _)| *idx).unwrap_or(total_events_traversed);

buffer.extend(filtered.into_iter().map(|(_, event)| event));

if buffer.len() >= chunk_size {
Expand Down

0 comments on commit 17f2564

Please sign in to comment.