Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Handle removed logs in filter changes and add geth compatibility field (
Browse files Browse the repository at this point in the history
#8796)

* Add removed geth compatibility field in log

* Fix mocked tests

* Add field block hash in PollFilter

* Store last block hash info for log filters

* Implement canon route

* Use canon logs for fetching reorg logs

Light client removed logs fetching is disabled. It looks expensive.

* Make sure removed flag is set

* Address grumbles
  • Loading branch information
sorpaas authored and andresilva committed Jun 18, 2018
1 parent 3ba4fe2 commit 75cff42
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 21 deletions.
4 changes: 2 additions & 2 deletions rpc/src/v1/helpers/poll_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ pub enum PollFilter {
Block(BlockNumber),
/// Hashes of all transactions which client was notified about.
PendingTransaction(Vec<H256>),
/// Number of From block number, pending logs and log filter itself.
Logs(BlockNumber, HashSet<Log>, Filter)
/// Number of From block number, last seen block hash, pending logs and log filter itself.
Logs(BlockNumber, Option<H256>, HashSet<Log>, Filter)
}

/// Returns only last `n` logs
Expand Down
61 changes: 54 additions & 7 deletions rpc/src/v1/impls/eth_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub trait Filterable {
fn best_block_number(&self) -> u64;

/// Get a block hash by block id.
fn block_hash(&self, id: BlockId) -> Option<RpcH256>;
fn block_hash(&self, id: BlockId) -> Option<H256>;

/// pending transaction hashes at the given block.
fn pending_transactions_hashes(&self) -> Vec<H256>;
Expand All @@ -52,6 +52,9 @@ pub trait Filterable {

/// Get a reference to the poll manager.
fn polls(&self) -> &Mutex<PollManager<PollFilter>>;

/// Get removed logs within route from the given block to the nearest canon block, not including the canon block. Also returns how many logs have been traversed.
fn removed_logs(&self, block_hash: H256, filter: &EthcoreFilter) -> (Vec<Log>, u64);
}

/// Eth filter rpc implementation for a full node.
Expand Down Expand Up @@ -80,8 +83,8 @@ impl<C, M> Filterable for EthFilterClient<C, M> where
self.client.chain_info().best_block_number
}

fn block_hash(&self, id: BlockId) -> Option<RpcH256> {
self.client.block_hash(id).map(Into::into)
fn block_hash(&self, id: BlockId) -> Option<H256> {
self.client.block_hash(id)
}

fn pending_transactions_hashes(&self) -> Vec<H256> {
Expand All @@ -100,6 +103,40 @@ impl<C, M> Filterable for EthFilterClient<C, M> where
}

fn polls(&self) -> &Mutex<PollManager<PollFilter>> { &self.polls }

fn removed_logs(&self, block_hash: H256, filter: &EthcoreFilter) -> (Vec<Log>, u64) {
let inner = || -> Option<Vec<H256>> {
let mut route = Vec::new();

let mut current_block_hash = block_hash;
let mut current_block_header = self.client.block_header(BlockId::Hash(current_block_hash))?;

while current_block_hash != self.client.block_hash(BlockId::Number(current_block_header.number()))? {
route.push(current_block_hash);

current_block_hash = current_block_header.parent_hash();
current_block_header = self.client.block_header(BlockId::Hash(current_block_hash))?;
}

Some(route)
};

let route = inner().unwrap_or_default();
let route_len = route.len() as u64;
(route.into_iter().flat_map(|block_hash| {
let mut filter = filter.clone();
filter.from_block = BlockId::Hash(block_hash);
filter.to_block = filter.from_block;

self.client.logs(filter).into_iter().map(|log| {
let mut log: Log = log.into();
log.log_type = "removed".into();
log.removed = true;

log
})
}).collect(), route_len)
}
}


Expand All @@ -108,7 +145,7 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
fn new_filter(&self, filter: Filter) -> Result<RpcU256> {
let mut polls = self.polls().lock();
let block_number = self.best_block_number();
let id = polls.create_poll(PollFilter::Logs(block_number, Default::default(), filter));
let id = polls.create_poll(PollFilter::Logs(block_number, None, Default::default(), filter));
Ok(id.into())
}

Expand Down Expand Up @@ -136,7 +173,7 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
let current_number = self.best_block_number() + 1;
let hashes = (*block_number..current_number).into_iter()
.map(BlockId::Number)
.filter_map(|id| self.block_hash(id))
.filter_map(|id| self.block_hash(id).map(Into::into))
.collect::<Vec<RpcH256>>();

*block_number = current_number;
Expand Down Expand Up @@ -166,7 +203,7 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
// return new hashes
Either::A(future::ok(FilterChanges::Hashes(new_hashes)))
},
PollFilter::Logs(ref mut block_number, ref mut previous_logs, ref filter) => {
PollFilter::Logs(ref mut block_number, ref mut last_block_hash, ref mut previous_logs, ref filter) => {
// retrive the current block number
let current_number = self.best_block_number();

Expand All @@ -175,6 +212,11 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {

// build appropriate filter
let mut filter: EthcoreFilter = filter.clone().into();

// retrieve reorg logs
let (mut reorg, reorg_len) = last_block_hash.map_or_else(|| (Vec::new(), 0), |h| self.removed_logs(h, &filter));
*block_number -= reorg_len as u64;

filter.from_block = BlockId::Number(*block_number);
filter.to_block = BlockId::Latest;

Expand All @@ -200,9 +242,14 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
// we want to get logs
*block_number = current_number + 1;

// save the current block hash, which we used to get back to the
// canon chain in case of reorg.
*last_block_hash = self.block_hash(BlockId::Number(current_number));

// retrieve logs in range from_block..min(BlockId::Latest..to_block)
let limit = filter.limit;
Either::B(self.logs(filter)
.map(move |logs| { reorg.extend(logs); reorg }) // append reorg logs in the front
.map(move |mut logs| { logs.extend(pending); logs }) // append fetched pending logs
.map(move |logs| limit_logs(logs, limit)) // limit the logs
.map(FilterChanges::Logs))
Expand All @@ -216,7 +263,7 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
let mut polls = self.polls().lock();

match polls.poll(&index.value()) {
Some(&PollFilter::Logs(ref _block_number, ref _previous_log, ref filter)) => filter.clone(),
Some(&PollFilter::Logs(ref _block_number, ref _last_block_hash, ref _previous_log, ref filter)) => filter.clone(),
// just empty array
Some(_) => return Box::new(future::ok(Vec::new())),
None => return Box::new(future::err(errors::filter_not_found())),
Expand Down
1 change: 1 addition & 0 deletions rpc/src/v1/impls/eth_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ impl<C: BlockChainClient> ChainNotify for ChainNotificationHandler<C> {
&ChainRouteType::Retracted =>
Ok(self.client.logs(filter).into_iter().map(Into::into).map(|mut log: Log| {
log.log_type = "removed".into();
log.removed = true;
log
}).collect()),
}
Expand Down
8 changes: 6 additions & 2 deletions rpc/src/v1/impls/light/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ impl<T: LightChainClient + 'static> Eth for EthClient<T> {
impl<T: LightChainClient + 'static> Filterable for EthClient<T> {
fn best_block_number(&self) -> u64 { self.client.chain_info().best_block_number }

fn block_hash(&self, id: BlockId) -> Option<RpcH256> {
self.client.block_hash(id).map(Into::into)
fn block_hash(&self, id: BlockId) -> Option<::ethereum_types::H256> {
self.client.block_hash(id)
}

fn pending_transactions_hashes(&self) -> Vec<::ethereum_types::H256> {
Expand All @@ -548,6 +548,10 @@ impl<T: LightChainClient + 'static> Filterable for EthClient<T> {
fn polls(&self) -> &Mutex<PollManager<PollFilter>> {
&self.polls
}

fn removed_logs(&self, _block_hash: ::ethereum_types::H256, _filter: &EthcoreFilter) -> (Vec<Log>, u64) {
(Default::default(), 0)
}
}

fn extract_uncle_at_index<T: LightChainClient>(block: encoded::Block, index: Index, client: Arc<T>) -> Option<RichBlock> {
Expand Down
10 changes: 5 additions & 5 deletions rpc/src/v1/tests/mocked/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ fn rpc_eth_logs() {
let request2 = r#"{"jsonrpc": "2.0", "method": "eth_getLogs", "params": [{"limit":1}], "id": 1}"#;
let request3 = r#"{"jsonrpc": "2.0", "method": "eth_getLogs", "params": [{"limit":0}], "id": 1}"#;

let response1 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x0","topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"},{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;
let response2 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;
let response1 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x0","removed":false,"topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"},{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","removed":false,"topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;
let response2 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","removed":false,"topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;
let response3 = r#"{"jsonrpc":"2.0","result":[],"id":1}"#;

assert_eq!(tester.io.handle_request_sync(request1), Some(response1.to_owned()));
Expand Down Expand Up @@ -276,8 +276,8 @@ fn rpc_logs_filter() {

let request_changes1 = r#"{"jsonrpc": "2.0", "method": "eth_getFilterChanges", "params": ["0x0"], "id": 1}"#;
let request_changes2 = r#"{"jsonrpc": "2.0", "method": "eth_getFilterChanges", "params": ["0x1"], "id": 1}"#;
let response1 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x0","topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"},{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;
let response2 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;
let response1 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x0","removed":false,"topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"},{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","removed":false,"topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;
let response2 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","removed":false,"topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;

assert_eq!(tester.io.handle_request_sync(request_changes1), Some(response1.to_owned()));
assert_eq!(tester.io.handle_request_sync(request_changes2), Some(response2.to_owned()));
Expand Down Expand Up @@ -1045,7 +1045,7 @@ fn rpc_eth_transaction_receipt() {
"params": ["0xb903239f8543d04b5dc1ba6579132b143087c68db1b2168786408fcbce568238"],
"id": 1
}"#;
let response = r#"{"jsonrpc":"2.0","result":{"blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","contractAddress":null,"cumulativeGasUsed":"0x20","gasUsed":"0x10","logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","data":"0x","logIndex":"0x1","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"}],"logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","root":"0x0000000000000000000000000000000000000000000000000000000000000000","status":null,"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0"},"id":1}"#;
let response = r#"{"jsonrpc":"2.0","result":{"blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","contractAddress":null,"cumulativeGasUsed":"0x20","gasUsed":"0x10","logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","data":"0x","logIndex":"0x1","removed":false,"topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"}],"logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","root":"0x0000000000000000000000000000000000000000000000000000000000000000","status":null,"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0"},"id":1}"#;

assert_eq!(tester.io.handle_request_sync(request), Some(response.to_owned()));
}
Expand Down
4 changes: 2 additions & 2 deletions rpc/src/v1/tests/mocked/eth_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,15 @@ fn should_subscribe_to_logs() {
// Check notifications (enacted)
handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO);
let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned()
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","removed":false,"topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned()
+ &format!("0x{:x}", tx_hash)
+ r#"","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"},"subscription":"0x416d77337e24399d"}}"#;
assert_eq!(res, Some(response.into()));

// Check notifications (retracted)
handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Retracted)]), vec![], vec![], DURATION_ZERO);
let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned()
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","removed":true,"topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned()
+ &format!("0x{:x}", tx_hash)
+ r#"","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"removed"},"subscription":"0x416d77337e24399d"}}"#;
assert_eq!(res, Some(response.into()));
Expand Down
Loading

0 comments on commit 75cff42

Please sign in to comment.