Skip to content

Commit

Permalink
feat(storage): add global cache to bloom filters
Browse files Browse the repository at this point in the history
FIXME: should remove bloom when purging a block
# Please enter the commit message for your changes. Lines starting
# with '#' will be ignored, and an empty message aborts the commit.
#
# On branch krisztian/events-intern-poc
# Changes to be committed:
#	modified:   Cargo.lock
#	modified:   crates/storage/Cargo.toml
#	modified:   crates/storage/src/bloom.rs
#	modified:   crates/storage/src/connection/event.rs
#
# Changes not staged for commit:
#	modified:   crates/crypto/src/signature/ecdsa.rs
#
# Untracked files:
#	.idea/
#	TODO.md
#	TODO2.md
#	bincode-scan.txt
#	call-flamegraph.svg
#	call.json
#	commitment.py
#	crates/gateway-types/src/pending.rs
#	crates/rpc/fixtures/contracts/dummy_account.json
#	crates/stark_hash/
#	crates/stark_poseidon/
#	events-scan.txt
#	events.json
#	events2.json
#	feeder_gateway.rest
#	get_storage_roots.py
#	identity.json
#	json-rpc.rest
#	receipts-scan.txt
#	request.json
#	snapshots.txt
#	test.py
#	test1.sh
#	test2.sh
#	test3.json
#	test_sierra_call.sh
#	trace.json
#	trace2.json
#	trace3.json
#	trace4.json
#	trace_block.json
#
  • Loading branch information
kkovaacs committed Jan 22, 2024
1 parent 0e4ef7e commit 9c2a39d
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ base64 = { workspace = true }
bincode = "2.0.0-rc.3"
bitvec = { workspace = true }
bloomfilter = "1.0.12"
cached = "0.44.0"
const_format = { workspace = true }
data-encoding = "2.4.0"
fake = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/storage/src/bloom.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bloomfilter::Bloom;
use pathfinder_crypto::Felt;

#[derive(Clone)]
pub(crate) struct BloomFilter(Bloom<Felt>);

impl BloomFilter {
Expand Down
71 changes: 61 additions & 10 deletions crates/storage/src/connection/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::sync::{Mutex, MutexGuard};

use crate::bloom::BloomFilter;
use crate::prelude::*;

use cached::{Cached, SizedCache};
use pathfinder_common::event::Event;
use pathfinder_common::{
BlockHash, BlockNumber, ContractAddress, EventData, EventKey, TransactionHash,
Expand Down Expand Up @@ -103,10 +106,6 @@ pub(super) fn get_events(
let mut offset = filter.offset;
let key_filter_is_empty = filter.keys.iter().flatten().count() == 0;

let mut bloom_stmt = tx
.inner()
.prepare_cached("SELECT bloom FROM starknet_events_filters WHERE block_number = ?")?;

let mut emitted_events = Vec::new();

for block_number in from_block..=to_block {
Expand All @@ -118,12 +117,7 @@ pub(super) fn get_events(
tracing::trace!(%block_number, %events_required, "Processing block");

if !key_filter_is_empty || filter.contract_address.is_some() {
let bloom = bloom_stmt
.query_row(params![&block_number], |row| {
let bytes: Vec<u8> = row.get(0)?;
Ok(BloomFilter::from_compressed_bytes(&bytes))
})
.optional()?;
let bloom = load_bloom(tx, block_number)?;
let Some(bloom) = bloom else {
break;
};
Expand Down Expand Up @@ -214,6 +208,63 @@ pub(super) fn get_events(
})
}

fn load_bloom(
tx: &Transaction<'_>,
block_number: u64,
) -> Result<Option<BloomFilter>, EventFilterError> {
if let Some(bloom) = GLOBAL_CACHE.get(block_number)? {
return Ok(Some(bloom));
}

let mut stmt = tx
.inner()
.prepare_cached("SELECT bloom FROM starknet_events_filters WHERE block_number = ?")?;

let bloom = stmt
.query_row(params![&block_number], |row| {
let bytes: Vec<u8> = row.get(0)?;
Ok(BloomFilter::from_compressed_bytes(&bytes))
})
.optional()?;

if let Some(bloom) = &bloom {
GLOBAL_CACHE.set(block_number, bloom.clone())?;
}

Ok(bloom)
}

lazy_static::lazy_static! {
// FIXME: remove bloom filters when purging blocks
static ref GLOBAL_CACHE: BloomFilterCache = BloomFilterCache::new();
}

struct BloomFilterCache(Mutex<SizedCache<u64, BloomFilter>>);

impl BloomFilterCache {
fn new() -> Self {
Self(Mutex::new(SizedCache::with_size(512 * 1024)))
}

fn locked_cache(
&self,
) -> Result<MutexGuard<'_, SizedCache<u64, BloomFilter>>, EventFilterError> {
self.0.lock().map_err(|err| {
tracing::warn!("Bloom filter cache lock is poisoned. Cause: {}.", err);
EventFilterError::Internal(anyhow::anyhow!("Poisoned lock"))
})
}

pub fn get(&self, block_number: u64) -> Result<Option<BloomFilter>, EventFilterError> {
Ok(self.locked_cache()?.cache_get(&block_number).cloned())
}

pub fn set(&self, block_number: u64, bloom: BloomFilter) -> Result<(), EventFilterError> {
self.locked_cache()?.cache_set(block_number, bloom);
Ok(())
}
}

fn keys_in_bloom(bloom: &BloomFilter, keys: &[Vec<EventKey>]) -> bool {
keys.iter().enumerate().all(|(idx, keys)| {
if keys.is_empty() {
Expand Down

0 comments on commit 9c2a39d

Please sign in to comment.