Skip to content

Commit

Permalink
fix: add limit to get cells
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Aug 9, 2024
1 parent b866491 commit ed2b597
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 63 deletions.
6 changes: 3 additions & 3 deletions util/gen-types/src/conversion/blockchain/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(feature = "std")]
mod std_env;

use crate::{borrow::ToOwned, bytes::Bytes, generated::packed, prelude::*, vec::Vec};
use crate::{bytes::Bytes, generated::packed, prelude::*, vec::Vec};

impl Pack<packed::Byte32> for [u8; 32] {
fn pack(&self) -> packed::Byte32 {
Expand Down Expand Up @@ -46,13 +46,13 @@ impl Pack<packed::Bytes> for Bytes {

impl<'r> Unpack<Bytes> for packed::BytesReader<'r> {
fn unpack(&self) -> Bytes {
Bytes::from(self.raw_data().to_owned())
Bytes::from(self.raw_data().to_vec())
}
}

impl Unpack<Bytes> for packed::Bytes {
fn unpack(&self) -> Bytes {
self.raw_data()
Bytes::from(self.raw_data().to_vec())
}
}

Expand Down
2 changes: 2 additions & 0 deletions util/gen-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ pub use molecule::bytes;

cfg_if::cfg_if! {
if #[cfg(feature = "std")] {
#[allow(unused_imports)]
use std::{vec, borrow};
} else {
#[allow(unused_imports)]
use alloc::{vec, borrow};
}
}
86 changes: 59 additions & 27 deletions util/indexer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ use std::sync::{Arc, RwLock};
pub(crate) const SUBSCRIBER_NAME: &str = "Indexer";
const DEFAULT_LOG_KEEP_NUM: usize = 1;
const DEFAULT_MAX_BACKGROUND_JOBS: usize = 6;
// We set the memory usage limit at 2 GB, meaning only 200 MB of data can be requested at a time.
// The maximum cell data size is 500 KB, and 400 cells can be requested at one time.
// related: https://github.com/serde-rs/json/issues/635
const DEFAULT_REQUEST_LIMIT: usize = 400;

/// Indexer service
#[derive(Clone)]
Expand Down Expand Up @@ -164,7 +168,7 @@ impl IndexerHandle {
));
}

let limit = limit.value() as usize;
let limit = std::cmp::min(limit.value() as usize, DEFAULT_REQUEST_LIMIT);
if limit == 0 {
return Err(Error::invalid_params("limit should be greater than 0"));
}
Expand Down Expand Up @@ -330,7 +334,7 @@ impl IndexerHandle {
limit: Uint32,
after_cursor: Option<JsonBytes>,
) -> Result<IndexerPagination<IndexerTx>, Error> {
let limit = limit.value() as usize;
let limit = std::cmp::min(limit.value() as usize, DEFAULT_REQUEST_LIMIT);
if limit == 0 {
return Err(Error::invalid_params("limit should be greater than 0"));
}
Expand Down Expand Up @@ -1688,35 +1692,63 @@ mod tests {
);

// test get_transactions rpc with exact search mode
let txs = rpc
.get_transactions(
IndexerSearchKey {
script: lock_script1.clone().into(),
script_search_mode: Some(IndexerSearchMode::Exact),
..Default::default()
},
IndexerOrder::Asc,
1000.into(),
None,
)
.unwrap();
let txs = {
let mut txs = IndexerPagination::new(Vec::new(), JsonBytes::from_bytes(Bytes::new()));
let mut last_key = None;
loop {
let txs_1 = rpc
.get_transactions(
IndexerSearchKey {
script: lock_script1.clone().into(),
script_search_mode: Some(IndexerSearchMode::Exact),
..Default::default()
},
IndexerOrder::Asc,
1000.into(),
last_key.clone(),
)
.unwrap();

if txs_1.objects.is_empty() {
break;
} else {
txs.objects.extend(txs_1.objects);
last_key = Some(txs_1.last_cursor);
}
}
txs
};

assert_eq!(total_blocks as usize * 3 - 1, txs.objects.len(), "total size should be cellbase tx count + total_block * 2 - 1 (genesis block only has one tx)");

// test get_transactions rpc group by tx hash with exact search mode
let txs = rpc
.get_transactions(
IndexerSearchKey {
script: lock_script1.clone().into(),
script_search_mode: Some(IndexerSearchMode::Exact),
group_by_transaction: Some(true),
..Default::default()
},
IndexerOrder::Asc,
1000.into(),
None,
)
.unwrap();
let txs = {
let mut txs = IndexerPagination::new(Vec::new(), JsonBytes::from_bytes(Bytes::new()));
let mut last_key = None;

loop {
let txs_1 = rpc
.get_transactions(
IndexerSearchKey {
script: lock_script1.clone().into(),
script_search_mode: Some(IndexerSearchMode::Exact),
group_by_transaction: Some(true),
..Default::default()
},
IndexerOrder::Asc,
1000.into(),
last_key.clone(),
)
.unwrap();
if txs_1.objects.is_empty() {
break;
} else {
txs.objects.extend(txs_1.objects);
last_key = Some(txs_1.last_cursor);
}
}
txs
};

assert_eq!(
total_blocks as usize * 2,
Expand Down
2 changes: 1 addition & 1 deletion util/jsonrpc-types/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl From<packed::Script> for Script {
fn from(input: packed::Script) -> Script {
Script {
code_hash: input.code_hash().unpack(),
args: JsonBytes::from_bytes(input.args().unpack()),
args: JsonBytes::from_vec(input.args().unpack()),
hash_type: core::ScriptHashType::try_from(input.hash_type())
.expect("checked data")
.into(),
Expand Down
4 changes: 2 additions & 2 deletions util/jsonrpc-types/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ impl JsonBytes {

impl From<packed::Bytes> for JsonBytes {
fn from(input: packed::Bytes) -> Self {
JsonBytes::from_bytes(input.raw_data())
JsonBytes::from_vec(input.raw_data().to_vec())
}
}

impl<'a> From<&'a packed::Bytes> for JsonBytes {
fn from(input: &'a packed::Bytes) -> Self {
JsonBytes::from_bytes(input.raw_data())
JsonBytes::from_vec(input.raw_data().to_vec())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl AsyncRichIndexerHandle {
limit: Uint32,
after: Option<JsonBytes>,
) -> Result<IndexerPagination<IndexerCell>, Error> {
let limit = limit.value();
let limit = std::cmp::min(limit.value(), DEFAULT_REQUEST_LIMIT);
if limit == 0 {
return Err(Error::invalid_params("limit should be greater than 0"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl AsyncRichIndexerHandle {
limit: Uint32,
after: Option<JsonBytes>,
) -> Result<IndexerPagination<IndexerTx>, Error> {
let limit = limit.value();
let limit = std::cmp::min(limit.value(), DEFAULT_REQUEST_LIMIT);
if limit == 0 {
return Err(Error::invalid_params("limit should be greater than 0"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use sqlx::Row;

use std::sync::{Arc, RwLock};

const DEFAULT_REQUEST_LIMIT: u32 = 400;

/// Async handle to the rich-indexer.
#[derive(Clone)]
pub struct AsyncRichIndexerHandle {
Expand Down
85 changes: 57 additions & 28 deletions util/rich-indexer/src/tests/query.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;

use ckb_indexer_sync::{CustomFilters, Pool};
use ckb_jsonrpc_types::{IndexerRange, IndexerSearchKeyFilter, IndexerTx};
use ckb_jsonrpc_types::{IndexerPagination, IndexerRange, IndexerSearchKeyFilter, IndexerTx};
use ckb_types::{
bytes::Bytes,
core::{
Expand Down Expand Up @@ -1339,37 +1339,66 @@ async fn script_search_mode_rpc() {
);

// test get_transactions rpc with exact search mode
let txs = rpc
.get_transactions(
IndexerSearchKey {
script: lock_script1.clone().into(),
script_search_mode: Some(IndexerSearchMode::Exact),
..Default::default()
},
IndexerOrder::Asc,
1000.into(),
None,
)
.await
.unwrap();
let txs = {
let mut txs = IndexerPagination::new(Vec::new(), JsonBytes::from_bytes(Bytes::new()));
let mut last_key = None;

loop {
let txs_1 = rpc
.get_transactions(
IndexerSearchKey {
script: lock_script1.clone().into(),
script_search_mode: Some(IndexerSearchMode::Exact),
..Default::default()
},
IndexerOrder::Asc,
1000.into(),
last_key.clone(),
)
.await
.unwrap();

if txs_1.objects.is_empty() {
break;
} else {
txs.objects.extend(txs_1.objects);
last_key = Some(txs_1.last_cursor);
}
}
txs
};

assert_eq!(total_blocks as usize * 3 - 1, txs.objects.len(), "total size should be cellbase tx count + total_block * 2 - 1 (genesis block only has one tx)");

// test get_transactions rpc group by tx hash with exact search mode
let txs = rpc
.get_transactions(
IndexerSearchKey {
script: lock_script1.clone().into(),
script_search_mode: Some(IndexerSearchMode::Exact),
group_by_transaction: Some(true),
..Default::default()
},
IndexerOrder::Asc,
1000.into(),
None,
)
.await
.unwrap();
let txs = {
let mut txs = IndexerPagination::new(Vec::new(), JsonBytes::from_bytes(Bytes::new()));
let mut last_key = None;

loop {
let txs_1 = rpc
.get_transactions(
IndexerSearchKey {
script: lock_script1.clone().into(),
script_search_mode: Some(IndexerSearchMode::Exact),
group_by_transaction: Some(true),
..Default::default()
},
IndexerOrder::Asc,
1000.into(),
last_key.clone(),
)
.await
.unwrap();
if txs_1.objects.is_empty() {
break;
} else {
txs.objects.extend(txs_1.objects);
last_key = Some(txs_1.last_cursor);
}
}
txs
};

assert_eq!(
total_blocks as usize * 2,
Expand Down

0 comments on commit ed2b597

Please sign in to comment.