From 5ce8c653b595f8ef566730e1b9f01d32e0822108 Mon Sep 17 00:00:00 2001 From: driftluo Date: Fri, 9 Aug 2024 15:49:12 +0800 Subject: [PATCH] fix: add limit to get cells --- resource/ckb.toml | 6 ++ util/app-config/src/configs/indexer.rs | 4 ++ util/gen-types/src/lib.rs | 2 + util/indexer/src/service.rs | 64 ++++++++++++++++++- util/jsonrpc-types/src/blockchain.rs | 2 +- util/jsonrpc-types/src/bytes.rs | 4 +- util/rich-indexer/src/indexer/mod.rs | 4 ++ .../async_indexer_handle/get_cells.rs | 6 ++ .../async_indexer_handle/get_transactions.rs | 6 ++ .../async_indexer_handle/mod.rs | 9 ++- util/rich-indexer/src/indexer_handle/mod.rs | 9 ++- util/rich-indexer/src/service.rs | 8 ++- util/rich-indexer/src/tests/insert.rs | 6 +- util/rich-indexer/src/tests/query.rs | 25 ++++---- 14 files changed, 131 insertions(+), 24 deletions(-) diff --git a/resource/ckb.toml b/resource/ckb.toml index e47a37cff4..c2091ad87e 100644 --- a/resource/ckb.toml +++ b/resource/ckb.toml @@ -212,6 +212,12 @@ block_uncles_cache_size = 30 # cell_filter = "let script = output.type;script!=() && script.code_hash == \"0x00000000000000000000000000000000000000000000000000545950455f4944\"" # # The initial tip can be set higher than the current indexer tip as the starting height for indexing. # init_tip_hash = "0x8fbd0ec887159d2814cee475911600e3589849670f5ee1ed9798b38fdeef4e44" +# By default, there is no limitation on the size of indexer request +# However, because serde json serialization consumes too much memory(10x), +# it may cause the physical machine to become unresponsive. +# We recommend a consumption limit of 2g, which is 400 as the limit, +# which is a safer approach +# request_limit = 400 # # # CKB rich-indexer has its unique configuration. # [indexer_v2.rich_indexer] diff --git a/util/app-config/src/configs/indexer.rs b/util/app-config/src/configs/indexer.rs index 5966b6673b..1f44a68aa6 100644 --- a/util/app-config/src/configs/indexer.rs +++ b/util/app-config/src/configs/indexer.rs @@ -35,6 +35,9 @@ pub struct IndexerConfig { /// The init tip block hash #[serde(default)] pub init_tip_hash: Option, + /// limit of indexer reqeust + #[serde(default)] + pub request_limit: Option, /// Rich indexer config options #[serde(default)] pub rich_indexer: RichIndexerConfig, @@ -56,6 +59,7 @@ impl Default for IndexerConfig { db_background_jobs: None, db_keep_log_file_num: None, init_tip_hash: None, + request_limit: None, rich_indexer: RichIndexerConfig::default(), } } diff --git a/util/gen-types/src/lib.rs b/util/gen-types/src/lib.rs index c2aa5c5183..bceae47fda 100644 --- a/util/gen-types/src/lib.rs +++ b/util/gen-types/src/lib.rs @@ -19,8 +19,10 @@ pub use molecule::bytes; cfg_if::cfg_if! { if #[cfg(feature = "std")] { + #[allow(unused_imports)] use std::{vec, borrow}; } else { + #[allow(unused_imports)] use alloc::{vec, borrow}; } } diff --git a/util/indexer/src/service.rs b/util/indexer/src/service.rs index 774276e392..849bb434c2 100644 --- a/util/indexer/src/service.rs +++ b/util/indexer/src/service.rs @@ -19,6 +19,7 @@ use rocksdb::{prelude::*, Direction, IteratorMode}; use std::convert::TryInto; use std::num::NonZeroUsize; use std::sync::{Arc, RwLock}; +use std::usize; pub(crate) const SUBSCRIBER_NAME: &str = "Indexer"; const DEFAULT_LOG_KEEP_NUM: usize = 1; @@ -31,6 +32,7 @@ pub struct IndexerService { sync: IndexerSyncService, block_filter: Option, cell_filter: Option, + request_limit: usize, } impl IndexerService { @@ -56,6 +58,7 @@ impl IndexerService { sync, block_filter: config.block_filter.clone(), cell_filter: config.cell_filter.clone(), + request_limit: config.request_limit.unwrap_or(usize::MAX), } } @@ -67,6 +70,7 @@ impl IndexerService { IndexerHandle { store: self.store.clone(), pool: self.sync.pool(), + request_limit: self.request_limit, } } @@ -124,6 +128,7 @@ impl IndexerService { pub struct IndexerHandle { pub(crate) store: RocksdbStore, pub(crate) pool: Option>>, + request_limit: usize, } impl IndexerHandle { @@ -168,6 +173,12 @@ impl IndexerHandle { if limit == 0 { return Err(Error::invalid_params("limit should be greater than 0")); } + if limit > self.request_limit { + return Err(Error::invalid_params(format!( + "limit must be less than {}", + self.request_limit, + ))); + } let (prefix, from_key, direction, skip) = build_query_options( &search_key, @@ -334,6 +345,12 @@ impl IndexerHandle { if limit == 0 { return Err(Error::invalid_params("limit should be greater than 0")); } + if limit > self.request_limit { + return Err(Error::invalid_params(format!( + "limit must be less than {}", + self.request_limit, + ))); + } if search_key .script_search_mode @@ -926,12 +943,13 @@ mod tests { #[test] fn rpc() { - let store = new_store("rpc"); + let store: RocksdbStore = new_store("rpc"); let pool = Arc::new(RwLock::new(Pool::default())); let indexer = Indexer::new(store.clone(), 10, 100, None, CustomFilters::new(None, None)); let rpc = IndexerHandle { store, pool: Some(Arc::clone(&pool)), + request_limit: usize::MAX, }; // setup test data @@ -1516,7 +1534,11 @@ mod tests { fn script_search_mode_rpc() { let store = new_store("script_search_mode_rpc"); let indexer = Indexer::new(store.clone(), 10, 100, None, CustomFilters::new(None, None)); - let rpc = IndexerHandle { store, pool: None }; + let rpc = IndexerHandle { + store, + pool: None, + request_limit: usize::MAX, + }; // setup test data let lock_script1 = ScriptBuilder::default() @@ -1755,11 +1777,47 @@ mod tests { ); } + #[test] + fn test_request_limit() { + let store = new_store("script_search_mode_rpc"); + let rpc = IndexerHandle { + store, + pool: None, + request_limit: 2, + }; + + let lock_script1 = ScriptBuilder::default() + .code_hash(H256(rand::random()).pack()) + .hash_type(ScriptHashType::Type.into()) + .args(Bytes::from(b"lock_script1".to_vec()).pack()) + .build(); + let data = [0u8; 7]; + let res = rpc.get_cells( + IndexerSearchKey { + script: lock_script1.into(), + filter: Some(IndexerSearchKeyFilter { + output_data: Some(JsonBytes::from_vec(data.to_vec())), + output_data_filter_mode: Some(IndexerSearchMode::Prefix), + ..Default::default() + }), + ..Default::default() + }, + IndexerOrder::Asc, + 1000.into(), + None, + ); + assert!(res.is_err()) + } + #[test] fn output_data_filter_mode_rpc() { let store = new_store("script_search_mode_rpc"); let indexer = Indexer::new(store.clone(), 10, 100, None, CustomFilters::new(None, None)); - let rpc = IndexerHandle { store, pool: None }; + let rpc = IndexerHandle { + store, + pool: None, + request_limit: usize::MAX, + }; // setup test data let lock_script1 = ScriptBuilder::default() diff --git a/util/jsonrpc-types/src/blockchain.rs b/util/jsonrpc-types/src/blockchain.rs index 47fbfff606..247b394c1c 100644 --- a/util/jsonrpc-types/src/blockchain.rs +++ b/util/jsonrpc-types/src/blockchain.rs @@ -116,7 +116,7 @@ impl From for Script { fn from(input: packed::Script) -> Script { Script { code_hash: input.code_hash().unpack(), - args: JsonBytes::from_bytes(input.args().unpack()), + args: JsonBytes::from_vec(input.args().unpack()), hash_type: core::ScriptHashType::try_from(input.hash_type()) .expect("checked data") .into(), diff --git a/util/jsonrpc-types/src/bytes.rs b/util/jsonrpc-types/src/bytes.rs index 1e270f8d54..b5b7a6bcf4 100644 --- a/util/jsonrpc-types/src/bytes.rs +++ b/util/jsonrpc-types/src/bytes.rs @@ -60,13 +60,13 @@ impl JsonBytes { impl From for JsonBytes { fn from(input: packed::Bytes) -> Self { - JsonBytes::from_bytes(input.raw_data()) + JsonBytes::from_vec(input.raw_data().to_vec()) } } impl<'a> From<&'a packed::Bytes> for JsonBytes { fn from(input: &'a packed::Bytes) -> Self { - JsonBytes::from_bytes(input.raw_data()) + JsonBytes::from_vec(input.raw_data().to_vec()) } } diff --git a/util/rich-indexer/src/indexer/mod.rs b/util/rich-indexer/src/indexer/mod.rs index cd5beb3a67..32c395f775 100644 --- a/util/rich-indexer/src/indexer/mod.rs +++ b/util/rich-indexer/src/indexer/mod.rs @@ -37,6 +37,7 @@ use std::sync::{Arc, RwLock}; pub(crate) struct RichIndexer { async_rich_indexer: AsyncRichIndexer, async_runtime: Handle, + request_limit: usize, } impl RichIndexer { @@ -46,10 +47,12 @@ impl RichIndexer { pool: Option>>, custom_filters: CustomFilters, async_runtime: Handle, + request_limit: usize, ) -> Self { Self { async_rich_indexer: AsyncRichIndexer::new(store, pool, custom_filters), async_runtime, + request_limit, } } } @@ -61,6 +64,7 @@ impl IndexerSync for RichIndexer { self.async_rich_indexer.store.clone(), self.async_rich_indexer.pool.clone(), self.async_runtime.clone(), + self.request_limit, ); indexer_handle .get_indexer_tip() diff --git a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs index 63a89d4800..100a5eaca0 100644 --- a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs +++ b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs @@ -26,6 +26,12 @@ impl AsyncRichIndexerHandle { if limit == 0 { return Err(Error::invalid_params("limit should be greater than 0")); } + if limit as usize > self.request_limit { + return Err(Error::invalid_params(format!( + "limit must be less than {}", + self.request_limit, + ))); + } let mut param_index = 1; diff --git a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_transactions.rs b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_transactions.rs index d8d58e0d68..ff58aa0a6f 100644 --- a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_transactions.rs +++ b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_transactions.rs @@ -24,6 +24,12 @@ impl AsyncRichIndexerHandle { if limit == 0 { return Err(Error::invalid_params("limit should be greater than 0")); } + if limit as usize > self.request_limit { + return Err(Error::invalid_params(format!( + "limit must be less than {}", + self.request_limit, + ))); + } search_key.filter = convert_max_values_in_search_filter(&search_key.filter); let mut tx = self diff --git a/util/rich-indexer/src/indexer_handle/async_indexer_handle/mod.rs b/util/rich-indexer/src/indexer_handle/async_indexer_handle/mod.rs index 7f6032ef25..4a99cbcc32 100644 --- a/util/rich-indexer/src/indexer_handle/async_indexer_handle/mod.rs +++ b/util/rich-indexer/src/indexer_handle/async_indexer_handle/mod.rs @@ -23,12 +23,17 @@ use std::sync::{Arc, RwLock}; pub struct AsyncRichIndexerHandle { store: SQLXPool, pool: Option>>, + request_limit: usize, } impl AsyncRichIndexerHandle { /// Construct new AsyncRichIndexerHandle instance - pub fn new(store: SQLXPool, pool: Option>>) -> Self { - Self { store, pool } + pub fn new(store: SQLXPool, pool: Option>>, request_limit: usize) -> Self { + Self { + store, + pool, + request_limit, + } } } diff --git a/util/rich-indexer/src/indexer_handle/mod.rs b/util/rich-indexer/src/indexer_handle/mod.rs index 10d1e1e62e..f96c8f590c 100644 --- a/util/rich-indexer/src/indexer_handle/mod.rs +++ b/util/rich-indexer/src/indexer_handle/mod.rs @@ -22,9 +22,14 @@ pub struct RichIndexerHandle { impl RichIndexerHandle { /// Construct new RichIndexerHandle instance - pub fn new(store: SQLXPool, pool: Option>>, async_handle: Handle) -> Self { + pub fn new( + store: SQLXPool, + pool: Option>>, + async_handle: Handle, + request_limit: usize, + ) -> Self { Self { - async_handle: AsyncRichIndexerHandle::new(store, pool), + async_handle: AsyncRichIndexerHandle::new(store, pool, request_limit), async_runtime: async_handle, } } diff --git a/util/rich-indexer/src/service.rs b/util/rich-indexer/src/service.rs index 716b08f2d2..17c680c108 100644 --- a/util/rich-indexer/src/service.rs +++ b/util/rich-indexer/src/service.rs @@ -1,5 +1,7 @@ //!The rich-indexer service. +use std::usize; + use crate::indexer::RichIndexer; use crate::store::SQLXPool; use crate::{AsyncRichIndexerHandle, RichIndexerHandle}; @@ -19,6 +21,7 @@ pub struct RichIndexerService { block_filter: Option, cell_filter: Option, async_handle: Handle, + request_limit: usize, } impl RichIndexerService { @@ -47,6 +50,7 @@ impl RichIndexerService { block_filter: config.block_filter.clone(), cell_filter: config.cell_filter.clone(), async_handle, + request_limit: config.request_limit.unwrap_or(usize::MAX), } } @@ -56,6 +60,7 @@ impl RichIndexerService { self.sync.pool(), CustomFilters::new(self.block_filter.as_deref(), self.cell_filter.as_deref()), self.async_handle.clone(), + self.request_limit, ) } @@ -83,6 +88,7 @@ impl RichIndexerService { self.store.clone(), self.sync.pool(), self.async_handle.clone(), + self.request_limit, ) } @@ -91,6 +97,6 @@ impl RichIndexerService { /// The returned handle can be used to get data from rich-indexer, /// and can be cloned to allow moving the Handle to other threads. pub fn async_handle(&self) -> AsyncRichIndexerHandle { - AsyncRichIndexerHandle::new(self.store.clone(), self.sync.pool()) + AsyncRichIndexerHandle::new(self.store.clone(), self.sync.pool(), self.request_limit) } } diff --git a/util/rich-indexer/src/tests/insert.rs b/util/rich-indexer/src/tests/insert.rs index 7ca08228a1..1c8fc9e26a 100644 --- a/util/rich-indexer/src/tests/insert.rs +++ b/util/rich-indexer/src/tests/insert.rs @@ -1,3 +1,5 @@ +use std::usize; + use super::*; use ckb_types::{ @@ -76,7 +78,7 @@ async fn with_custom_block_filter() { None, ), ); - let indexer_handle = AsyncRichIndexerHandle::new(storage, None); + let indexer_handle = AsyncRichIndexerHandle::new(storage, None, usize::MAX); let lock_script1 = ScriptBuilder::default() .code_hash(H256(rand::random()).pack()) @@ -290,7 +292,7 @@ async fn with_custom_cell_filter() { Some(r#"output.type?.args == "0x747970655f73637269707431""#), ), ); - let indexer_handle = AsyncRichIndexerHandle::new(storage, None); + let indexer_handle = AsyncRichIndexerHandle::new(storage, None, usize::MAX); let lock_script1 = ScriptBuilder::default() .code_hash(H256(rand::random()).pack()) diff --git a/util/rich-indexer/src/tests/query.rs b/util/rich-indexer/src/tests/query.rs index 80731de569..8adcba4aeb 100644 --- a/util/rich-indexer/src/tests/query.rs +++ b/util/rich-indexer/src/tests/query.rs @@ -12,13 +12,16 @@ use ckb_types::{ H256, }; -use std::sync::{Arc, RwLock}; +use std::{ + sync::{Arc, RwLock}, + usize, +}; use tokio::test; #[test] async fn test_query_tip() { let pool = connect_sqlite(MEMORY_DB).await; - let indexer = AsyncRichIndexerHandle::new(pool.clone(), None); + let indexer = AsyncRichIndexerHandle::new(pool.clone(), None, usize::MAX); let res = indexer.get_indexer_tip().await.unwrap(); assert!(res.is_none()); @@ -34,7 +37,7 @@ async fn test_query_tip() { #[test] async fn get_cells() { let pool = connect_sqlite(MEMORY_DB).await; - let indexer = AsyncRichIndexerHandle::new(pool.clone(), None); + let indexer = AsyncRichIndexerHandle::new(pool.clone(), None, usize::MAX); let res = indexer.get_indexer_tip().await.unwrap(); assert!(res.is_none()); @@ -163,7 +166,7 @@ async fn get_cells() { #[test] async fn get_cells_filter_data() { let pool = connect_sqlite(MEMORY_DB).await; - let indexer = AsyncRichIndexerHandle::new(pool.clone(), None); + let indexer = AsyncRichIndexerHandle::new(pool.clone(), None, usize::MAX); let res = indexer.get_indexer_tip().await.unwrap(); assert!(res.is_none()); @@ -219,7 +222,7 @@ async fn get_cells_filter_data() { #[test] async fn get_cells_by_cursor() { let pool = connect_sqlite(MEMORY_DB).await; - let indexer = AsyncRichIndexerHandle::new(pool.clone(), None); + let indexer = AsyncRichIndexerHandle::new(pool.clone(), None, usize::MAX); let res = indexer.get_indexer_tip().await.unwrap(); assert!(res.is_none()); @@ -281,7 +284,7 @@ async fn get_cells_by_cursor() { #[test] async fn get_transactions_ungrouped() { let pool = connect_sqlite(MEMORY_DB).await; - let indexer = AsyncRichIndexerHandle::new(pool.clone(), None); + let indexer = AsyncRichIndexerHandle::new(pool.clone(), None, usize::MAX); insert_blocks(pool).await; @@ -405,7 +408,7 @@ async fn get_transactions_ungrouped() { #[test] async fn get_transactions_grouped() { let pool = connect_sqlite(MEMORY_DB).await; - let indexer = AsyncRichIndexerHandle::new(pool.clone(), None); + let indexer = AsyncRichIndexerHandle::new(pool.clone(), None, usize::MAX); insert_blocks(pool).await; @@ -486,7 +489,7 @@ async fn get_transactions_grouped() { #[test] async fn get_cells_capacity() { let pool = connect_sqlite(MEMORY_DB).await; - let indexer = AsyncRichIndexerHandle::new(pool.clone(), None); + let indexer = AsyncRichIndexerHandle::new(pool.clone(), None, usize::MAX); insert_blocks(pool).await; @@ -557,7 +560,7 @@ async fn rpc() { let store = connect_sqlite(MEMORY_DB).await; let pool = Arc::new(RwLock::new(Pool::default())); let indexer = AsyncRichIndexer::new(store.clone(), None, CustomFilters::new(None, None)); - let rpc = AsyncRichIndexerHandle::new(store, Some(Arc::clone(&pool))); + let rpc = AsyncRichIndexerHandle::new(store, Some(Arc::clone(&pool)), usize::MAX); // setup test data let lock_script1 = ScriptBuilder::default() @@ -1165,7 +1168,7 @@ async fn rpc() { async fn script_search_mode_rpc() { let pool = connect_sqlite(MEMORY_DB).await; let indexer = AsyncRichIndexer::new(pool.clone(), None, CustomFilters::new(None, None)); - let rpc = AsyncRichIndexerHandle::new(pool, None); + let rpc = AsyncRichIndexerHandle::new(pool, None, usize::MAX); // setup test data let lock_script1 = ScriptBuilder::default() @@ -1414,7 +1417,7 @@ async fn script_search_mode_rpc() { async fn output_data_filter_mode_rpc() { let pool = connect_sqlite(MEMORY_DB).await; let indexer = AsyncRichIndexer::new(pool.clone(), None, CustomFilters::new(None, None)); - let rpc = AsyncRichIndexerHandle::new(pool, None); + let rpc = AsyncRichIndexerHandle::new(pool, None, usize::MAX); // setup test data let lock_script1 = ScriptBuilder::default()