Skip to content

Commit

Permalink
aggregate filter creation and storage
Browse files Browse the repository at this point in the history
  • Loading branch information
sistemd committed Nov 5, 2024
1 parent e02507e commit 6e30934
Show file tree
Hide file tree
Showing 9 changed files with 761 additions and 286 deletions.
5 changes: 5 additions & 0 deletions crates/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ license = { workspace = true }
rust-version = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
aggregate_bloom = []

default = []

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
Binary file modified crates/rpc/fixtures/mainnet.sqlite
Binary file not shown.
45 changes: 45 additions & 0 deletions crates/rpc/src/method/get_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,17 @@ pub async fn get_events(
offset: requested_offset,
};

// TODO:
// Instrumentation and `AggregateBloom` version of fetching events
// for the given `EventFilter` are under a feature flag for now and
// we do not execute them during testing because they would only
// slow the tests down and would not have any impact on their outcome.
// Follow-up PR will use the `AggregateBloom` logic to create the output,
// then the conditions will be removed.

#[cfg(all(feature = "aggregate_bloom", not(test)))]
let start = std::time::Instant::now();

let page = transaction
.events(
&filter,
Expand All @@ -228,6 +239,40 @@ pub async fn get_events(
EventFilterError::PageSizeTooSmall => GetEventsError::Custom(e.into()),
})?;

#[cfg(all(feature = "aggregate_bloom", not(test)))]
{
let elapsed = start.elapsed();

tracing::info!(
"Getting events (individual Bloom filters) took {:?}",
elapsed
);

let start = std::time::Instant::now();
let page_from_aggregate = transaction
.events_from_aggregate(&filter, context.config.get_events_max_blocks_to_scan)
.map_err(|e| match e {
EventFilterError::Internal(e) => GetEventsError::Internal(e),
EventFilterError::PageSizeTooSmall => GetEventsError::Custom(e.into()),
})?;
let elapsed = start.elapsed();

tracing::info!(
"Getting events (aggregate Bloom filters) took {:?}",
elapsed
);

if page != page_from_aggregate {
tracing::error!(
"Page of events from individual and aggregate bloom filters does not match!"
);
tracing::error!("Individual: {:?}", page);
tracing::error!("Aggregate: {:?}", page_from_aggregate);
} else {
tracing::info!("Page of events from individual and aggregate bloom filters match!");
}
}

let mut events = GetEventsResult {
events: page.events.into_iter().map(|e| e.into()).collect(),
continuation_token: page.continuation_token.map(|token| {
Expand Down
76 changes: 39 additions & 37 deletions crates/rpc/src/method/subscribe_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,14 @@ mod tests {
use tokio::sync::mpsc;

use crate::context::{RpcConfig, RpcContext};
use crate::jsonrpc::{handle_json_rpc_socket, RpcRouter};
use crate::jsonrpc::{handle_json_rpc_socket, RpcRouter, CATCH_UP_BATCH_SIZE};
use crate::pending::PendingWatcher;
use crate::v02::types::syncing::Syncing;
use crate::{v08, Notifications, Reorg, SyncState};

#[tokio::test]
async fn no_filtering() {
let num_blocks = 2000;
let num_blocks = 80;
let router = setup(num_blocks).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
Expand Down Expand Up @@ -319,14 +319,14 @@ mod tests {

#[tokio::test]
async fn filter_from_address() {
let router = setup(2000).await;
let router = setup(80).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
let params = serde_json::json!(
{
"block": {"block_number": 0},
"from_address": "0x90",
"from_address": "0x46",
}
);
receiver_tx
Expand All @@ -351,7 +351,7 @@ mod tests {
}
_ => panic!("Expected text message"),
};
let expected = sample_event_message(0x90, subscription_id);
let expected = sample_event_message(0x46, subscription_id);
let event = sender_rx.recv().await.unwrap().unwrap();
let json: serde_json::Value = match event {
Message::Text(json) => serde_json::from_str(&json).unwrap(),
Expand All @@ -371,9 +371,9 @@ mod tests {
.context
.notifications
.l2_blocks
.send(sample_block(0x90).into())
.send(sample_block(0x46).into())
.unwrap();
let expected = sample_event_message(0x90, subscription_id);
let expected = sample_event_message(0x46, subscription_id);
let event = sender_rx.recv().await.unwrap().unwrap();
let json: serde_json::Value = match event {
Message::Text(json) => serde_json::from_str(&json).unwrap(),
Expand All @@ -385,14 +385,14 @@ mod tests {

#[tokio::test]
async fn filter_keys() {
let router = setup(2000).await;
let router = setup(80).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
let params = serde_json::json!(
{
"block": {"block_number": 0},
"keys": [["0x90"], [], ["0x92", "0x93"]],
"keys": [["0x46"], [], ["0x47", "0x48"]],
}
);
receiver_tx
Expand All @@ -417,7 +417,7 @@ mod tests {
}
_ => panic!("Expected text message"),
};
let expected = sample_event_message(0x90, subscription_id);
let expected = sample_event_message(0x46, subscription_id);
let event = sender_rx.recv().await.unwrap().unwrap();
let json: serde_json::Value = match event {
Message::Text(json) => serde_json::from_str(&json).unwrap(),
Expand All @@ -437,9 +437,9 @@ mod tests {
.context
.notifications
.l2_blocks
.send(sample_block(0x90).into())
.send(sample_block(0x46).into())
.unwrap();
let expected = sample_event_message(0x90, subscription_id);
let expected = sample_event_message(0x46, subscription_id);
let event = sender_rx.recv().await.unwrap().unwrap();
let json: serde_json::Value = match event {
Message::Text(json) => serde_json::from_str(&json).unwrap(),
Expand All @@ -451,15 +451,15 @@ mod tests {

#[tokio::test]
async fn filter_from_address_and_keys() {
let router = setup(2000).await;
let router = setup(80).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
let params = serde_json::json!(
{
"block": {"block_number": 0},
"from_address": "0x90",
"keys": [["0x90"], [], ["0x92", "0x93"]],
"from_address": "0x46",
"keys": [["0x46"], [], ["0x47", "0x48"]],
}
);
receiver_tx
Expand All @@ -484,7 +484,7 @@ mod tests {
}
_ => panic!("Expected text message"),
};
let expected = sample_event_message(0x90, subscription_id);
let expected = sample_event_message(0x46, subscription_id);
let event = sender_rx.recv().await.unwrap().unwrap();
let json: serde_json::Value = match event {
Message::Text(json) => serde_json::from_str(&json).unwrap(),
Expand All @@ -504,9 +504,9 @@ mod tests {
.context
.notifications
.l2_blocks
.send(sample_block(0x90).into())
.send(sample_block(0x46).into())
.unwrap();
let expected = sample_event_message(0x90, subscription_id);
let expected = sample_event_message(0x46, subscription_id);
let event = sender_rx.recv().await.unwrap().unwrap();
let json: serde_json::Value = match event {
Message::Text(json) => serde_json::from_str(&json).unwrap(),
Expand All @@ -518,32 +518,32 @@ mod tests {

#[tokio::test]
async fn too_many_keys_filter() {
let router = setup(2000).await;
let router = setup(80).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
let params = serde_json::json!(
{
"block": {"block_number": 0},
"from_address": "0x90",
"from_address": "0x46",
"keys": [
["0x91"],
["0x92"],
["0x93"],
["0x94"],
["0x95"],
["0x96"],
["0x97"],
["0x98"],
["0x99"],
["0x9a"],
["0x9b"],
["0x9c"],
["0x9d"],
["0x9e"],
["0x9f"],
["0xa0"],
["0xa1"],
["0x46"],
["0x47"],
["0x48"],
["0x49"],
["0x4a"],
["0x4b"],
["0x4c"],
["0x4d"],
["0x4e"],
["0x4f"],
["0x50"],
["0x51"],
["0x52"],
["0x53"],
["0x54"],
["0x55"],
["0x56"],
],
}
);
Expand Down Expand Up @@ -644,6 +644,8 @@ mod tests {
}

async fn setup(num_blocks: u64) -> RpcRouter {
assert!(num_blocks == 0 || num_blocks > CATCH_UP_BATCH_SIZE);

let storage = StorageBuilder::in_memory().unwrap();
tokio::task::spawn_blocking({
let storage = storage.clone();
Expand Down
Loading

0 comments on commit 6e30934

Please sign in to comment.