diff --git a/.github/workflows/forest.yml b/.github/workflows/forest.yml index 39d3137c874e..c4bda0139cef 100644 --- a/.github/workflows/forest.yml +++ b/.github/workflows/forest.yml @@ -282,6 +282,32 @@ jobs: chmod +x ~/.cargo/bin/forest* - run: ./scripts/tests/calibnet_kademlia_check.sh timeout-minutes: '${{ fromJSON(env.SCRIPT_TIMEOUT_MINUTES) }}' + calibnet-eth-mapping-check: + needs: + - build-ubuntu + name: Calibnet eth mapping check + runs-on: ubuntu-latest + steps: + - run: lscpu + - uses: actions/cache@v4 + with: + path: '${{ env.FIL_PROOFS_PARAMETER_CACHE }}' + key: proof-params-keys + - name: Checkout Sources + uses: actions/checkout@v4 + - uses: actions/download-artifact@v4 + with: + name: 'forest-${{ runner.os }}' + path: ~/.cargo/bin + - uses: actions/download-artifact@v4 + with: + name: 'forest-${{ runner.os }}' + path: ~/.cargo/bin + - name: Set permissions + run: | + chmod +x ~/.cargo/bin/forest* + - run: ./scripts/tests/calibnet_eth_mapping_check.sh + timeout-minutes: '${{ fromJSON(env.SCRIPT_TIMEOUT_MINUTES) }}' db-migration-checks: needs: - build-ubuntu diff --git a/CHANGELOG.md b/CHANGELOG.md index dba9e7512d71..f745622ae013 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,9 @@ - [#4381](https://github.com/ChainSafe/forest/pull/4381) Add support for the `Filecoin.StateSectorPartition` RPC method. +- [#4368](https://github.com/ChainSafe/forest/issues/4368) Add support for the + `Filecoin.EthGetMessageCidByTransactionHash` RPC method. + - [#4167](https://github.com/ChainSafe/forest/issues/4167) Add support for the `Filecoin.EthGetBlockByHash` RPC method. diff --git a/scripts/tests/calibnet_eth_mapping_check.sh b/scripts/tests/calibnet_eth_mapping_check.sh new file mode 100755 index 000000000000..2e2d5f587d60 --- /dev/null +++ b/scripts/tests/calibnet_eth_mapping_check.sh @@ -0,0 +1,73 @@ +#!/usr/bin/env bash +# This script is checking the correctness of the ethereum mapping feature +# It requires both the `forest` and `forest-cli` binaries to be in the PATH. + +set -eu + +source "$(dirname "$0")/harness.sh" + +forest_init + +FOREST_URL='http://127.0.0.1:2345/rpc/v1' + +NUM_TIPSETS=600 + +echo "Get Ethereum block hashes and transactions hashes from the last $NUM_TIPSETS tipsets" + +OUTPUT=$($FOREST_CLI_PATH info show) + +HEAD_EPOCH=$(echo "$OUTPUT" | sed -n 's/.*epoch: \([0-9]*\).*/\1/p') +EPOCH=$((HEAD_EPOCH - 1)) + +ETH_BLOCK_HASHES=() +ETH_TX_HASHES=() + +for ((i=0; i<=NUM_TIPSETS; i++)); do + EPOCH_HEX=$(printf "0x%x" $EPOCH) + JSON=$(curl -s -X POST "$FOREST_URL" \ + -H 'Content-Type: application/json' \ + --data "$(jq -n --arg epoch "$EPOCH_HEX" '{jsonrpc: "2.0", id: 1, method: "Filecoin.EthGetBlockByNumber", params: [$epoch, false]}')") + + + HASH=$(echo "$JSON" | jq -r '.result.hash') + ETH_BLOCK_HASHES+=("$HASH") + + if [[ $(echo "$JSON" | jq -e '.result.transactions') != "null" ]]; then + TRANSACTIONS=$(echo "$JSON" | jq -r '.result.transactions[]') + for tx in $TRANSACTIONS; do + ETH_TX_HASHES+=("$tx") + done + else + echo "No transactions found for block hash: $EPOCH_HEX" + fi + + EPOCH=$((EPOCH - 1)) +done + +ERROR=0 +echo "Testing Ethereum mapping" + +for hash in "${ETH_BLOCK_HASHES[@]}"; do + JSON=$(curl -s -X POST "$FOREST_URL" \ + -H 'Content-Type: application/json' \ + --data "$(jq -n --arg hash "$hash" '{jsonrpc: "2.0", id: 1, method: "Filecoin.EthGetBalance", params: ["0xff38c072f286e3b20b3954ca9f99c05fbecc64aa", $hash]}')") + + if [[ $(echo "$JSON" | jq -e '.result') == "null" ]]; then + echo "Missing tipset key for hash $hash" + ERROR=1 + fi +done + +for hash in "${ETH_TX_HASHES[@]}"; do + JSON=$(curl -s -X POST "$FOREST_URL" \ + -H 'Content-Type: application/json' \ + --data "$(jq -n --arg hash "$hash" '{jsonrpc: "2.0", id: 1, method: "Filecoin.EthGetMessageCidByTransactionHash", params: [$hash]}')") + + if [[ $(echo "$JSON" | jq -e '.result') == "null" ]]; then + echo "Missing cid for hash $hash" + ERROR=1 + fi +done + +echo "Done" +exit $ERROR diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index f2876feebb1b..efa6f629a0cb 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -9,8 +9,8 @@ use crate::interpreter::BlockMessages; use crate::interpreter::VMTrace; use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage}; -use crate::networks::ChainConfig; -use crate::rpc::eth; +use crate::networks::{ChainConfig, Height}; +use crate::rpc::eth::{self, eth_tx_from_signed_eth_message}; use crate::shim::clock::ChainEpoch; use crate::shim::{ address::Address, econ::TokenAmount, executor::Receipt, message::Message, @@ -78,6 +78,9 @@ pub struct ChainStore { /// Ethereum mappings store eth_mappings: Arc, + + /// Needed by the Ethereum mapping. + chain_config: Arc, } impl BitswapStoreRead for ChainStore @@ -131,12 +134,13 @@ where let cs = Self { publisher, chain_index, - tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config), + tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config.clone()), db, settings, genesis_block_header, validated_blocks, eth_mappings, + chain_config, }; Ok(cs) @@ -162,10 +166,10 @@ where pub fn put_tipset(&self, ts: &Tipset) -> Result<(), Error> { persist_objects(self.blockstore(), ts.block_headers().iter())?; - self.put_tipset_key(ts.key())?; - // Expand tipset to include other compatible blocks at the epoch. let expanded = self.expand_tipset(ts.min_ticket_block().clone())?; + self.put_tipset_key(expanded.key())?; + self.update_heaviest(Arc::new(expanded))?; Ok(()) } @@ -177,6 +181,20 @@ where Ok(()) } + /// Writes the delegated message `Cid`s to the blockstore for `EthAPI` queries. + pub fn put_delegated_message_hashes<'a>( + &self, + headers: impl Iterator, + ) -> Result<(), Error> { + tracing::debug!("persist eth mapping"); + + // The messages will be ordered from most recent block to less recent + let delegated_messages = self.headers_delegated_messages(headers)?; + + self.process_signed_messages(&delegated_messages)?; + Ok(()) + } + /// Reads the `TipsetKey` from the blockstore for `EthAPI` queries. pub fn get_required_tipset_key(&self, hash: ð::Hash) -> Result { let tsk = self @@ -188,12 +206,19 @@ where } /// Writes with timestamp the `Hash` to `Cid` mapping to the blockstore for `EthAPI` queries. - pub fn put_mapping(&self, k: eth::Hash, v: Cid) -> Result<(), Error> { - let timestamp = chrono::Utc::now().timestamp() as u64; + pub fn put_mapping(&self, k: eth::Hash, v: Cid, timestamp: u64) -> Result<(), Error> { self.eth_mappings.write_obj(&k, &(v, timestamp))?; Ok(()) } + /// Reads the `Cid` from the blockstore for `EthAPI` queries. + pub fn get_mapping(&self, hash: ð::Hash) -> Result, Error> { + Ok(self + .eth_mappings + .read_obj::<(Cid, u64)>(hash)? + .map(|(cid, _)| cid)) + } + /// Expands tipset to tipset with all other headers in the same epoch using /// the tipset tracker. fn expand_tipset(&self, header: CachingBlockHeader) -> Result { @@ -354,6 +379,88 @@ where .map_err(|e| Error::Other(format!("Could not get tipset from keys {e:?}")))?; Ok((lbts, *next_ts.parent_state())) } + + /// Filter [`SignedMessage`]'s to keep only the most recent ones, then write corresponding entries to the Ethereum mapping. + pub fn process_signed_messages(&self, messages: &[(SignedMessage, u64)]) -> anyhow::Result<()> + where + DB: fvm_ipld_blockstore::Blockstore, + { + let eth_txs: Vec<(eth::Hash, Cid, u64, usize)> = messages + .iter() + .enumerate() + .filter_map(|(i, (smsg, timestamp))| { + if let Ok(tx) = eth_tx_from_signed_eth_message(smsg, self.chain_config.eth_chain_id) + { + if let Ok(hash) = tx.eth_hash() { + // newest messages are the ones with lowest index + Some((hash, smsg.cid().unwrap(), *timestamp, i)) + } else { + None + } + } else { + None + } + }) + .collect(); + let filtered = filter_lowest_index(eth_txs); + let num_entries = filtered.len(); + + // write back + for (k, v, timestamp) in filtered.into_iter() { + tracing::trace!("Insert mapping {} => {}", k, v); + self.put_mapping(k, v, timestamp)?; + } + tracing::debug!("Wrote {} entries in Ethereum mapping", num_entries); + Ok(()) + } + + pub fn headers_delegated_messages<'a>( + &self, + headers: impl Iterator, + ) -> anyhow::Result> + where + DB: fvm_ipld_blockstore::Blockstore, + { + let mut delegated_messages = vec![]; + + // Hygge is the start of Ethereum support in the FVM (through the FEVM actor). + // Before this height, no notion of an Ethereum-like API existed. + let filtered_headers = + headers.filter(|bh| bh.epoch >= self.chain_config.epoch(Height::Hygge)); + + for bh in filtered_headers { + if let Ok((_, secp_cids)) = block_messages(self.blockstore(), bh) { + let mut messages: Vec<_> = secp_cids + .into_iter() + .filter(|msg| msg.is_delegated()) + .map(|m| (m, bh.timestamp)) + .collect(); + delegated_messages.append(&mut messages); + } + } + + Ok(delegated_messages) + } +} + +fn filter_lowest_index(values: Vec<(eth::Hash, Cid, u64, usize)>) -> Vec<(eth::Hash, Cid, u64)> { + let map: HashMap = values.into_iter().fold( + HashMap::default(), + |mut acc, (hash, cid, timestamp, index)| { + acc.entry(hash) + .and_modify(|&mut (_, _, ref mut min_index)| { + if index < *min_index { + *min_index = index; + } + }) + .or_insert((cid, timestamp, index)); + acc + }, + ); + + map.into_iter() + .map(|(hash, (cid, timestamp, _))| (hash, cid, timestamp)) + .collect() } /// Returns a Tuple of BLS messages of type `UnsignedMessage` and SECP messages diff --git a/src/chain_sync/chain_muxer.rs b/src/chain_sync/chain_muxer.rs index 19544ea67c3d..77887e54c7e8 100644 --- a/src/chain_sync/chain_muxer.rs +++ b/src/chain_sync/chain_muxer.rs @@ -511,6 +511,9 @@ where block.persist(&chain_store.db)?; } + // This is needed for the Ethereum mapping + chain_store.put_tipset_key(tipset.key())?; + // Update the peer head network .peer_manager() diff --git a/src/chain_sync/tipset_syncer.rs b/src/chain_sync/tipset_syncer.rs index ea61217749c2..115469a86684 100644 --- a/src/chain_sync/tipset_syncer.rs +++ b/src/chain_sync/tipset_syncer.rs @@ -761,7 +761,12 @@ async fn sync_tipset_range( return Err(why.into()); }; - // Sync and validate messages from the tipsets + // Persist tipset keys + for ts in parent_tipsets.iter() { + chain_store.put_tipset_key(ts.key())?; + } + + // Sync and validate messages from the tipsets tracker.write().set_stage(SyncStage::Messages); if let Err(why) = sync_messages_check_state( tracker.clone(), @@ -769,7 +774,7 @@ async fn sync_tipset_range( network, chain_store.clone(), &bad_block_cache, - parent_tipsets, + parent_tipsets.clone(), &genesis, InvalidBlockStrategy::Forgiving, ) @@ -780,6 +785,9 @@ async fn sync_tipset_range( return Err(why); }; + // Call only once messages persisted + chain_store.put_delegated_message_hashes(headers.into_iter())?; + // At this point the head is synced and it can be set in the store as the // heaviest debug!( @@ -961,6 +969,9 @@ async fn sync_tipset( proposed_head.block_headers().iter(), )?; + // Persist tipset key + chain_store.put_tipset_key(proposed_head.key())?; + // Sync and validate messages from the tipsets if let Err(e) = sync_messages_check_state( // Include a dummy WorkerState @@ -979,6 +990,9 @@ async fn sync_tipset( return Err(e); } + // Call only once messages persisted + chain_store.put_delegated_message_hashes(proposed_head.block_headers().iter())?; + // Add the tipset to the store. The tipset will be expanded with other blocks // with the same [epoch, parents] before updating the heaviest Tipset in // the store. diff --git a/src/cli_shared/cli/client.rs b/src/cli_shared/cli/client.rs index 7a424ec27493..0026d524c420 100644 --- a/src/cli_shared/cli/client.rs +++ b/src/cli_shared/cli/client.rs @@ -75,6 +75,8 @@ pub struct Client { pub token_exp: Duration, /// Load actors from the bundle file (possibly generating it if it doesn't exist) pub load_actors: bool, + /// `TTL` to set for Ethereum `Hash` to `Cid` entries or `None` to never reclaim them. + pub eth_mapping_ttl: Option, } impl Default for Client { @@ -103,6 +105,7 @@ impl Default for Client { ), token_exp: Duration::try_seconds(5184000).expect("Infallible"), // 60 Days = 5184000 Seconds load_actors: true, + eth_mapping_ttl: None, } } } diff --git a/src/daemon/db_util.rs b/src/daemon/db_util.rs index 76541c09ddb7..60625651d092 100644 --- a/src/daemon/db_util.rs +++ b/src/daemon/db_util.rs @@ -2,19 +2,14 @@ // SPDX-License-Identifier: Apache-2.0, MIT use crate::blocks::Tipset; -use crate::chain::block_messages; use crate::cli_shared::snapshot; use crate::db::car::forest::FOREST_CAR_FILE_EXTENSION; use crate::db::car::{ForestCar, ManyCar}; -use crate::message::SignedMessage; use crate::networks::Height; -use crate::rpc::eth::{self, eth_tx_from_signed_eth_message}; use crate::state_manager::StateManager; use crate::utils::db::car_stream::CarStream; use crate::utils::io::EitherMmapOrRandomAccessFile; -use ahash::HashMap; use anyhow::Context as _; -use cid::Cid; use futures::TryStreamExt; use std::ffi::OsStr; use std::fs; @@ -34,6 +29,9 @@ use crate::rpc::eth::Hash; #[cfg(doc)] use crate::blocks::TipsetKey; +#[cfg(doc)] +use cid::Cid; + pub fn load_all_forest_cars(store: &ManyCar, forest_car_db_dir: &Path) -> anyhow::Result<()> { if !forest_car_db_dir.is_dir() { fs::create_dir_all(forest_car_db_dir)?; @@ -181,77 +179,20 @@ where if ts.epoch() < state_manager.chain_config().epoch(Height::Hygge) { break; } - for bh in ts.block_headers() { - if let Ok((_, secp_cids)) = block_messages(&state_manager.blockstore(), bh) { - let mut messages = secp_cids - .into_iter() - .filter(|msg| msg.is_delegated()) - .collect(); - delegated_messages.append(&mut messages); - } - } + delegated_messages.append( + &mut state_manager + .chain_store() + .headers_delegated_messages(ts.block_headers().iter())?, + ); state_manager.chain_store().put_tipset_key(ts.key())?; } - process_signed_messages(state_manager, &delegated_messages)?; + state_manager + .chain_store() + .process_signed_messages(&delegated_messages)?; Ok(()) } -/// Filter [`SignedMessage`]'s to keep only delegated ones and the most recent ones, then write them to the chain store. -fn process_signed_messages( - state_manager: &StateManager, - messages: &[SignedMessage], -) -> anyhow::Result<()> -where - DB: fvm_ipld_blockstore::Blockstore, -{ - let delegated_messages = messages.iter().filter(|msg| msg.is_delegated()); - let eth_chain_id = state_manager.chain_config().eth_chain_id; - - let eth_txs: Vec<(eth::Hash, Cid, usize)> = delegated_messages - .enumerate() - .filter_map(|(i, smsg)| { - if let Ok(tx) = eth_tx_from_signed_eth_message(smsg, eth_chain_id) { - if let Ok(hash) = tx.eth_hash() { - // newest messages are the ones with lowest index - Some((hash, smsg.cid().unwrap(), i)) - } else { - None - } - } else { - None - } - }) - .collect(); - let filtered = filter_lowest_index(eth_txs); - - // write back - for (k, v) in filtered.into_iter() { - state_manager.chain_store().put_mapping(k, v)?; - } - Ok(()) -} - -fn filter_lowest_index(values: Vec<(eth::Hash, Cid, usize)>) -> Vec<(eth::Hash, Cid)> { - let map: HashMap = - values - .into_iter() - .fold(HashMap::default(), |mut acc, (hash, cid, index)| { - acc.entry(hash) - .and_modify(|&mut (_, ref mut min_index)| { - if index < *min_index { - *min_index = index; - } - }) - .or_insert((cid, index)); - acc - }); - - map.into_iter() - .map(|(hash, (cid, _))| (hash, cid)) - .collect() -} - #[cfg(test)] mod test { use super::*; diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 1ed68771c2f3..30249c5d9416 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -20,7 +20,7 @@ use crate::daemon::db_util::{ }; use crate::db::car::ManyCar; use crate::db::db_engine::{db_root, open_db}; -use crate::db::MarkAndSweep; +use crate::db::{ttl::EthMappingCollector, MarkAndSweep}; use crate::genesis::{get_network_name_from_genesis, read_genesis_header}; use crate::key_management::{ KeyStore, KeyStoreConfig, ENCRYPTED_KEYSTORE_NAME, FOREST_KEYSTORE_PHRASE_ENV, @@ -262,6 +262,21 @@ pub(super) async fn start( services.spawn(async move { db_garbage_collector.gc_loop(GC_INTERVAL).await }); } + if let Some(ttl) = config.client.eth_mapping_ttl { + let chain_store = chain_store.clone(); + let chain_config = chain_config.clone(); + services.spawn(async move { + tracing::info!("Starting collector for eth_mappings"); + + let mut collector = EthMappingCollector::new( + chain_store.db.clone(), + chain_config.eth_chain_id, + Duration::from_secs(ttl.into()), + ); + collector.run().await + }); + } + let publisher = chain_store.publisher(); // Initialize StateManager diff --git a/src/db/car/many.rs b/src/db/car/many.rs index f4251a9bdda6..0e95c1587cf0 100644 --- a/src/db/car/many.rs +++ b/src/db/car/many.rs @@ -217,6 +217,14 @@ impl EthMappingsStore for ManyCar { fn exists(&self, key: ð::Hash) -> anyhow::Result { EthMappingsStore::exists(self.writer(), key) } + + fn get_message_cids(&self) -> anyhow::Result> { + EthMappingsStore::get_message_cids(self.writer()) + } + + fn delete(&self, keys: Vec) -> anyhow::Result<()> { + EthMappingsStore::delete(self.writer(), keys) + } } #[cfg(test)] diff --git a/src/db/memory.rs b/src/db/memory.rs index 003aca50bc7a..a36755567c2c 100644 --- a/src/db/memory.rs +++ b/src/db/memory.rs @@ -79,6 +79,25 @@ impl EthMappingsStore for MemoryDB { fn exists(&self, key: ð::Hash) -> anyhow::Result { Ok(self.eth_mappings_db.read().contains_key(key)) } + + fn get_message_cids(&self) -> anyhow::Result> { + let cids = self + .eth_mappings_db + .read() + .iter() + .filter_map(|(_, value)| fvm_ipld_encoding::from_slice::<(Cid, u64)>(value).ok()) + .collect(); + + Ok(cids) + } + + fn delete(&self, keys: Vec) -> anyhow::Result<()> { + let mut lock = self.eth_mappings_db.write(); + for hash in keys.iter() { + lock.remove(hash); + } + Ok(()) + } } impl Blockstore for MemoryDB { diff --git a/src/db/mod.rs b/src/db/mod.rs index 5a2bf9009417..9fb1935a4d5e 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -7,6 +7,7 @@ pub mod parity_db; pub mod parity_db_config; mod gc; +pub mod ttl; pub use gc::MarkAndSweep; pub use memory::MemoryDB; mod db_mode; @@ -14,6 +15,7 @@ pub mod migration; use crate::rpc::eth; use anyhow::Context as _; +use cid::Cid; use serde::de::DeserializeOwned; use serde::Serialize; use std::sync::Arc; @@ -104,6 +106,12 @@ pub trait EthMappingsStore { /// Returns `Ok(true)` if key exists in store. fn exists(&self, key: ð::Hash) -> anyhow::Result; + + /// Returns all message CIDs with their timestamp. + fn get_message_cids(&self) -> anyhow::Result>; + + /// Deletes `keys` if keys exist in store. + fn delete(&self, keys: Vec) -> anyhow::Result<()>; } impl EthMappingsStore for Arc { @@ -118,6 +126,14 @@ impl EthMappingsStore for Arc { fn exists(&self, key: ð::Hash) -> anyhow::Result { EthMappingsStore::exists(self.as_ref(), key) } + + fn get_message_cids(&self) -> anyhow::Result> { + EthMappingsStore::get_message_cids(self.as_ref()) + } + + fn delete(&self, keys: Vec) -> anyhow::Result<()> { + EthMappingsStore::delete(self.as_ref(), keys) + } } pub trait EthMappingsStoreExt { diff --git a/src/db/parity_db.rs b/src/db/parity_db.rs index d39ca7a43bfa..2a47271ad1cf 100644 --- a/src/db/parity_db.rs +++ b/src/db/parity_db.rs @@ -191,6 +191,27 @@ impl EthMappingsStore for ParityDb { .map(|size| size.is_some()) .context("error checking if key exists") } + + fn get_message_cids(&self) -> anyhow::Result> { + let mut cids = Vec::new(); + + self.db + .iter_column_while(DbColumn::EthMappings as u8, |val| { + if let Ok(value) = fvm_ipld_encoding::from_slice::<(Cid, u64)>(&val.value) { + cids.push(value); + } + true + })?; + + Ok(cids) + } + + fn delete(&self, keys: Vec) -> anyhow::Result<()> { + Ok(self.db.commit_changes(keys.into_iter().map(|key| { + let bytes = key.0.as_bytes().to_vec(); + (DbColumn::EthMappings as u8, Operation::Dereference(bytes)) + }))?) + } } impl Blockstore for ParityDb { diff --git a/src/db/tests/db_utils/parity.rs b/src/db/tests/db_utils/parity.rs index eb0229ff7fda..dd7439bcd508 100644 --- a/src/db/tests/db_utils/parity.rs +++ b/src/db/tests/db_utils/parity.rs @@ -7,7 +7,7 @@ use crate::db::{parity_db::ParityDb, parity_db_config::ParityDbConfig}; /// Temporary, self-cleaning ParityDB pub struct TempParityDB { - db: Option, + pub db: Option, _dir: tempfile::TempDir, // kept for cleaning up during Drop } diff --git a/src/db/ttl/mod.rs b/src/db/ttl/mod.rs new file mode 100644 index 000000000000..92108ae918b4 --- /dev/null +++ b/src/db/ttl/mod.rs @@ -0,0 +1,164 @@ +// Copyright 2019-2024 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::message::ChainMessage; +use crate::rpc::eth::{self, eth_tx_from_signed_eth_message}; +use fvm_ipld_blockstore::Blockstore; +use std::sync::Arc; +use std::time::Duration; + +use super::EthMappingsStore; + +pub struct EthMappingCollector { + db: Arc, + eth_chain_id: u32, + ttl: std::time::Duration, +} + +impl EthMappingCollector { + /// Creates a `TTL` collector for the Ethereum mapping. + /// + pub fn new(db: Arc, eth_chain_id: u32, ttl: Duration) -> Self { + Self { + db, + eth_chain_id, + ttl, + } + } + + /// Remove keys whose `(duration - timestamp) > TTL` from the database + /// where `duration` is the elapsed time since "UNIX timestamp". + fn ttl_workflow(&self, duration: Duration) -> anyhow::Result<()> { + let keys: Vec = self + .db + .get_message_cids()? + .iter() + .filter(|(_, timestamp)| { + duration.saturating_sub(Duration::from_secs(*timestamp)) > self.ttl + }) + .filter_map(|(cid, _)| { + let message = crate::chain::get_chain_message(self.db.as_ref(), cid); + if let Ok(ChainMessage::Signed(smsg)) = message { + let result = eth_tx_from_signed_eth_message(&smsg, self.eth_chain_id); + if let Ok(tx) = result { + tx.eth_hash().ok() + } else { + None + } + } else { + None + } + }) + .collect(); + + for h in keys.iter() { + tracing::trace!("Marked {} for deletion", h); + } + let count = keys.len(); + self.db.delete(keys)?; + + tracing::debug!( + "Found and deleted {count} mappings older than {:?}", + self.ttl + ); + + Ok(()) + } + + pub async fn run(&mut self) -> anyhow::Result<()> { + loop { + tokio::time::sleep(self.ttl).await; + + let duration = Duration::from_secs(chrono::Utc::now().timestamp() as u64); + self.ttl_workflow(duration)?; + } + } +} + +#[cfg(test)] +mod test { + use std::convert::TryFrom; + + use chrono::{DateTime, TimeZone, Utc}; + use cid::Cid; + + use crate::chain_sync::TipsetValidator; + use crate::db::EthMappingsStore; + use crate::db::EthMappingsStoreExt; + use crate::db::MemoryDB; + use crate::networks::calibnet::ETH_CHAIN_ID; + use crate::test_utils::construct_eth_messages; + + const ZERO_DURATION: Duration = Duration::from_secs(0); + const EPS_DURATION: Duration = Duration::from_secs(1); + const TTL_DURATION: Duration = Duration::from_secs(60); + + use super::*; + + #[tokio::test] + async fn test_ttl() { + let eth_chain_id: u32 = ETH_CHAIN_ID.try_into().unwrap(); + + let blockstore = Arc::new(MemoryDB::default()); + + let (bls0, secp0) = construct_eth_messages(0); + let (bls1, secp1) = construct_eth_messages(1); + + crate::chain::persist_objects(&blockstore, [bls0.clone(), bls1.clone()].iter()).unwrap(); + crate::chain::persist_objects(&blockstore, [secp0.clone(), secp1.clone()].iter()).unwrap(); + + let expected_root = + Cid::try_from("bafy2bzacebqzqoow32yddtu746myprecdtblty77f3k6at6v2axkhvqd3iwvi") + .unwrap(); + + let root = TipsetValidator::compute_msg_root( + &blockstore, + &[bls0.clone(), bls1.clone()], + &[secp0.clone(), secp1.clone()], + ) + .expect("Computing message root should succeed"); + assert_eq!(root, expected_root); + + // Unix epoch corresponds to 1970-01-01 00:00:00 UTC + let unix_timestamp: DateTime = Utc.timestamp_opt(0, 0).unwrap(); + + // Add key0 with unix epoch + let tx0 = eth_tx_from_signed_eth_message(&secp0, eth_chain_id).unwrap(); + let key0 = tx0.eth_hash().unwrap(); + + let timestamp = unix_timestamp.timestamp() as u64; + blockstore + .write_obj(&key0, &(secp0.cid().unwrap(), timestamp)) + .unwrap(); + + assert!(blockstore.exists(&key0).unwrap()); + + // Add key1 with unix epoch + 2 * ttl + let tx1 = eth_tx_from_signed_eth_message(&secp1, eth_chain_id).unwrap(); + let key1 = tx1.eth_hash().unwrap(); + + blockstore + .write_obj( + &key1, + &( + secp1.cid().unwrap(), + unix_timestamp.timestamp() as u64 + 2 * TTL_DURATION.as_secs(), + ), + ) + .unwrap(); + + assert!(blockstore.exists(&key1).unwrap()); + + let collector = EthMappingCollector::new(blockstore.clone(), eth_chain_id, TTL_DURATION); + + collector.ttl_workflow(ZERO_DURATION).unwrap(); + + assert!(blockstore.exists(&key0).unwrap()); + assert!(blockstore.exists(&key1).unwrap()); + + collector.ttl_workflow(TTL_DURATION + EPS_DURATION).unwrap(); + + assert!(!blockstore.exists(&key0).unwrap()); + assert!(blockstore.exists(&key1).unwrap()); + } +} diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 0acbd7c11ac8..47834c6a2f82 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -80,7 +80,7 @@ enum EAMMethod { } #[repr(u64)] -enum EVMMethod { +pub enum EVMMethod { // it is very unfortunate but the hasher creates a circular dependency, so we use the raw // number. // InvokeContract = frc42_dispatch::method_hash!("InvokeEVM"), @@ -157,6 +157,16 @@ lotus_json_with_self!(Int64); pub struct Hash(#[schemars(with = "String")] pub ethereum_types::H256); impl Hash { + // Should ONLY be used for blocks and Filecoin messages. Eth transactions expect a different hashing scheme. + pub fn to_cid(&self) -> cid::Cid { + use cid::multihash::MultihashDigest; + + let mh = cid::multihash::Code::Blake2b256 + .wrap(self.0.as_bytes()) + .expect("should not fail"); + Cid::new_v1(fvm_ipld_encoding::DAG_CBOR, mh) + } + pub fn empty_uncles() -> Self { Self(ethereum_types::H256::from_str(EMPTY_UNCLES).unwrap()) } @@ -1258,6 +1268,51 @@ impl RpcMethod<1> for EthGetBlockTransactionCountByNumber { } } +pub enum EthGetMessageCidByTransactionHash {} +impl RpcMethod<1> for EthGetMessageCidByTransactionHash { + const NAME: &'static str = "Filecoin.EthGetMessageCidByTransactionHash"; + const PARAM_NAMES: [&'static str; 1] = ["tx_hash"]; + const API_VERSION: ApiVersion = ApiVersion::V1; + const PERMISSION: Permission = Permission::Read; + + type Params = (Hash,); + type Ok = Option; + + async fn handle( + ctx: Ctx, + (tx_hash,): Self::Params, + ) -> Result { + let result = ctx.chain_store.get_mapping(&tx_hash); + match result { + Ok(Some(cid)) => return Ok(Some(cid)), + Ok(None) => tracing::debug!("Undefined key {tx_hash}"), + _ => { + result?; + } + } + + // This isn't an eth transaction we have the mapping for, so let's try looking it up as a filecoin message + let cid = tx_hash.to_cid(); + + let result: Result, crate::chain::Error> = + crate::chain::messages_from_cids(ctx.chain_store.blockstore(), &[cid]); + if result.is_ok() { + // This is an Eth Tx, Secp message, Or BLS message in the mpool + return Ok(Some(cid)); + } + + let result: Result, crate::chain::Error> = + crate::chain::messages_from_cids(ctx.chain_store.blockstore(), &[cid]); + if result.is_ok() { + // This is a BLS message + return Ok(Some(cid)); + } + + // Ethereum clients expect an empty response when the message was not found + Ok(None) + } +} + fn count_messages_in_tipset(store: &impl Blockstore, ts: &Tipset) -> anyhow::Result { let mut message_cids = CidHashSet::default(); for block in ts.block_headers() { @@ -1447,12 +1502,20 @@ impl RpcMethod<3> for EthGetStorageAt { #[cfg(test)] mod test { use super::*; - use ethereum_types::H160; + use ethereum_types::{H160, H256}; use num_bigint; use num_traits::{FromBytes, Signed}; + use quickcheck::Arbitrary; use quickcheck_macros::quickcheck; use std::num::ParseIntError; + impl Arbitrary for Hash { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + let arr: [u8; 32] = std::array::from_fn(|_ix| u8::arbitrary(g)); + Self(H256(arr)) + } + } + #[quickcheck] fn gas_price_result_serde_roundtrip(i: u128) { let r = EthBigInt(i.into()); @@ -1626,6 +1689,29 @@ mod test { } } + #[test] + fn test_hash() { + let test_cases = [ + r#""0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184""#, + r#""0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738""#, + ]; + + for hash in test_cases { + let h: Hash = serde_json::from_str(hash).unwrap(); + + let c = h.to_cid(); + let h1: Hash = c.into(); + assert_eq!(h, h1); + } + } + + #[quickcheck] + fn test_eth_hash_roundtrip(eth_hash: Hash) { + let cid = eth_hash.to_cid(); + let hash = cid.into(); + assert_eq!(eth_hash, hash); + } + #[test] fn test_block_constructor() { let block = Block::new(false, 1); diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 4033b97d57af..c796951663c4 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -79,6 +79,7 @@ macro_rules! for_each_method { $callback!(crate::rpc::eth::EthGetBlockByNumber); $callback!(crate::rpc::eth::EthGetBlockTransactionCountByHash); $callback!(crate::rpc::eth::EthGetBlockTransactionCountByNumber); + $callback!(crate::rpc::eth::EthGetMessageCidByTransactionHash); // gas vertical $callback!(crate::rpc::gas::GasEstimateGasLimit); diff --git a/src/test_utils/mod.rs b/src/test_utils/mod.rs index 77b0b74559fa..4c3a11ddbbc0 100644 --- a/src/test_utils/mod.rs +++ b/src/test_utils/mod.rs @@ -1,12 +1,15 @@ // Copyright 2019-2024 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use std::str::FromStr; + use crate::blocks::Ticket; use crate::blocks::VRFProof; use crate::message::SignedMessage; +use crate::rpc::eth::EVMMethod; use crate::shim::{ address::Address, - crypto::Signature, + crypto::{Signature, SignatureType, SECP_SIG_LEN}, message::{Message, Message_v3}, }; use base64::{prelude::BASE64_STANDARD, Engine}; @@ -31,6 +34,29 @@ pub fn construct_messages() -> (Message, SignedMessage) { (bls_messages, secp_messages) } +/// Returns a tuple of unsigned and signed messages used for testing the Ethereum mapping +pub fn construct_eth_messages(sequence: u64) -> (Message, SignedMessage) { + let mut bls_message: Message = Message_v3 { + to: Address::from_str("t410foy6ucbmuujaequ3zsdo6nsubyogp6vtk23t4odq") + .unwrap() + .into(), + from: Address::from_str("t410fse4uvumo6ko46igb6lshg3peztqs3h6755vommy") + .unwrap() + .into(), + ..Message_v3::default() + } + .into(); + bls_message.method_num = EVMMethod::InvokeContract as u64; + bls_message.sequence = sequence; + + let secp_message = SignedMessage::new_unchecked( + bls_message.clone(), + Signature::new(SignatureType::Delegated, vec![0; SECP_SIG_LEN]), + ); + + (bls_message, secp_message) +} + // Serialize macro used for testing #[macro_export] macro_rules! to_string_with { diff --git a/src/tool/subcommands/api_cmd.rs b/src/tool/subcommands/api_cmd.rs index fd93432cefb1..633a319d191b 100644 --- a/src/tool/subcommands/api_cmd.rs +++ b/src/tool/subcommands/api_cmd.rs @@ -64,6 +64,8 @@ use tracing::{debug, info, warn}; const COLLECTION_SAMPLE_SIZE: usize = 5; +const CALIBNET_CHAIN_ID: u32 = crate::networks::calibnet::ETH_CHAIN_ID as u32; + #[derive(Debug, Subcommand)] #[allow(clippy::large_enum_variant)] pub enum ApiCommands { @@ -123,6 +125,9 @@ pub enum ApiCommands { /// Worker address to use where key is applicable. Worker key must be in the key-store. #[arg(long)] worker_address: Option
, + /// Ethereum chain ID. Default to the calibnet chain ID. + #[arg(long, default_value_t = CALIBNET_CHAIN_ID)] + eth_chain_id: u32, }, } @@ -137,6 +142,7 @@ struct ApiTestFlags { max_concurrent_requests: usize, miner_address: Option
, worker_address: Option
, + eth_chain_id: u32, } impl ApiCommands { @@ -164,6 +170,7 @@ impl ApiCommands { max_concurrent_requests, miner_address, worker_address, + eth_chain_id, } => { let config = ApiTestFlags { filter, @@ -174,6 +181,7 @@ impl ApiCommands { max_concurrent_requests, miner_address, worker_address, + eth_chain_id, }; compare_apis( @@ -1195,6 +1203,35 @@ fn eth_tests_with_tipset(shared_tipset: &Tipset) -> Vec { ] } +fn eth_state_tests_with_tipset( + store: &Arc, + shared_tipset: &Tipset, + eth_chain_id: u32, +) -> anyhow::Result> { + let mut tests = vec![]; + + for block in shared_tipset.block_headers() { + let state = StateTree::new_from_root(store.clone(), shared_tipset.parent_state())?; + + let (bls_messages, secp_messages) = crate::chain::store::block_messages(store, block)?; + for smsg in sample_signed_messages(bls_messages.iter(), secp_messages.iter()) { + let tx = new_eth_tx_from_signed_message(&smsg, &state, eth_chain_id)?; + tests.push(RpcTest::identity( + EthGetMessageCidByTransactionHash::request((tx.hash,)).unwrap(), + )); + } + } + tests.push(RpcTest::identity( + EthGetMessageCidByTransactionHash::request((Hash::from_str( + "0x37690cfec6c1bf4c3b9288c7a5d783e98731e90b0a4c177c2a374c7a9427355f", + ) + .unwrap(),)) + .unwrap(), + )); + + Ok(tests) +} + fn gas_tests_with_tipset(shared_tipset: &Tipset) -> Vec { // This is a testnet address with a few FILs. The private key has been // discarded. If calibnet is reset, a new address should be created. @@ -1240,6 +1277,11 @@ fn snapshot_tests(store: Arc, config: &ApiTestFlags) -> anyhow::Result< tests.extend(state_tests_with_tipset(&store, &tipset)?); tests.extend(eth_tests_with_tipset(&tipset)); tests.extend(gas_tests_with_tipset(&tipset)); + tests.extend(eth_state_tests_with_tipset( + &store, + &tipset, + config.eth_chain_id, + )?); } Ok(tests) } @@ -1277,6 +1319,21 @@ fn sample_messages<'a>( .unique() } +fn sample_signed_messages<'a>( + bls_messages: impl Iterator + 'a, + secp_messages: impl Iterator + 'a, +) -> impl Iterator + 'a { + bls_messages + .unique() + .take(COLLECTION_SAMPLE_SIZE) + .map(|msg| { + let sig = Signature::new_bls(vec![]); + SignedMessage::new_unchecked(msg.clone(), sig) + }) + .chain(secp_messages.cloned().unique().take(COLLECTION_SAMPLE_SIZE)) + .unique() +} + /// Compare two RPC providers. The providers are labeled `forest` and `lotus`, /// but other nodes may be used (such as `venus`). The `lotus` node is assumed /// to be correct and the `forest` node will be marked as incorrect if it