Skip to content

Commit

Permalink
Merge pull request #1730 from eqlabs/mirko/trace-coalescing
Browse files Browse the repository at this point in the history
feat(rpc): trace request coalescing
  • Loading branch information
sistemd authored Feb 13, 2024
2 parents 86a1bfa + f6024fb commit 43b36fe
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ pathfinder-storage = { path = "../storage" }
primitive-types = { workspace = true, features = ["serde"] }
starknet-gateway-types = { path = "../gateway-types" }
starknet_api = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
47 changes: 36 additions & 11 deletions crates/executor/src/simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
sync::{Arc, Mutex},
};

use anyhow::Context;
use blockifier::{
state::{cached_state::CachedState, errors::StateError, state_api::State},
transaction::transaction_execution::Transaction,
Expand Down Expand Up @@ -31,8 +32,14 @@ use super::{
types::{FeeEstimate, TransactionSimulation, TransactionTrace},
};

#[derive(Debug)]
enum CacheItem {
Inflight(tokio::sync::broadcast::Receiver<Traces>),
Cached(Traces),
}

#[derive(Debug, Clone)]
pub struct TraceCache(Arc<Mutex<SizedCache<BlockHash, Traces>>>);
pub struct TraceCache(Arc<Mutex<SizedCache<BlockHash, CacheItem>>>);

type Traces = Vec<(TransactionHash, TransactionTrace)>;

Expand Down Expand Up @@ -126,11 +133,29 @@ pub fn trace(
) -> Result<Vec<(TransactionHash, TransactionTrace)>, TransactionExecutionError> {
let (mut state, block_context) = execution_state.starknet_state()?;

let cached = { cache.0.lock().unwrap().cache_get(&block_hash).cloned() };
if let Some(cached) = cached {
tracing::trace!(block=%block_hash, "trace cache hit");
return Ok(cached);
}
let sender = {
let mut cache = cache.0.lock().unwrap();
match cache.cache_get(&block_hash) {
Some(CacheItem::Cached(cached)) => {
tracing::trace!(block=%block_hash, "trace cache hit");
return Ok(cached.clone());
}
Some(CacheItem::Inflight(receiver)) => {
tracing::trace!(block=%block_hash, "trace already inflight");
let mut receiver = receiver.resubscribe();
drop(cache);

let trace = receiver.blocking_recv().context("Trace error")?;
return Ok(trace);
}
None => {
tracing::trace!(block=%block_hash, "trace cache miss");
let (sender, receiver) = tokio::sync::broadcast::channel(1);
cache.cache_set(block_hash, CacheItem::Inflight(receiver));
sender
}
}
};

tracing::trace!(block=%block_hash, "trace cache miss");
let mut traces = Vec::with_capacity(transactions.len());
Expand All @@ -154,11 +179,11 @@ pub fn trace(
let trace = to_trace(tx_type, tx_info, state_diff);
traces.push((hash, trace));
}
cache
.0
.lock()
.unwrap()
.cache_set(block_hash, traces.clone());

// Lock the cache before sending to avoid race conditions between senders and receivers.
let mut cache = cache.0.lock().unwrap();
let _ = sender.send(traces.clone());
cache.cache_set(block_hash, CacheItem::Cached(traces.clone()));
Ok(traces)
}

Expand Down
34 changes: 31 additions & 3 deletions crates/rpc/src/v06/method/trace_block_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ use pathfinder_common::transaction::Transaction;

use super::simulate_transactions::dto::TransactionTrace;

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct TraceBlockTransactionsInput {
block_id: BlockId,
}

#[derive(Debug, Serialize, Eq, PartialEq)]
#[derive(Debug, Serialize, Eq, PartialEq, Clone)]
pub struct Trace {
pub transaction_hash: TransactionHash,
pub trace_root: TransactionTrace,
}

#[derive(Debug, Serialize, Eq, PartialEq)]
#[derive(Debug, Serialize, Eq, PartialEq, Clone)]
pub struct TraceBlockTransactionsOutput(pub Vec<Trace>);

#[derive(Debug)]
Expand Down Expand Up @@ -275,6 +275,7 @@ pub(crate) mod tests {
use pathfinder_common::{
block_hash, felt, receipt::Receipt, BlockHeader, GasPrice, SierraHash, TransactionIndex,
};
use tokio::task::JoinSet;

use super::*;

Expand Down Expand Up @@ -391,6 +392,33 @@ pub(crate) mod tests {
Ok(())
}

/// Test that multiple requests for the same block return correctly. This checks that the
/// trace request coalescing doesn't do anything unexpected.
#[tokio::test]
async fn test_request_coalescing() -> anyhow::Result<()> {
const NUM_REQUESTS: usize = 1000;

let (context, next_block_header, traces) = setup_multi_tx_trace_test().await?;

let input = TraceBlockTransactionsInput {
block_id: next_block_header.hash.into(),
};
let mut joins = JoinSet::new();
for _ in 0..NUM_REQUESTS {
let input = input.clone();
let context = context.clone();
joins.spawn(async move { trace_block_transactions(context, input).await.unwrap() });
}
let mut outputs = Vec::new();
while let Some(output) = joins.join_next().await {
outputs.push(output.unwrap());
}
let expected = vec![TraceBlockTransactionsOutput(traces); NUM_REQUESTS];

pretty_assertions_sorted::assert_eq!(outputs, expected);
Ok(())
}

pub(crate) async fn setup_multi_tx_trace_pending_test(
) -> anyhow::Result<(RpcContext, Vec<Trace>)> {
use super::super::simulate_transactions::tests::fixtures;
Expand Down

0 comments on commit 43b36fe

Please sign in to comment.