Skip to content

Commit

Permalink
Streaming RpcParams parsing (#401)
Browse files Browse the repository at this point in the history
* Streaming RpcParams parsing

* DRY RpcParams::one again

* Fix doc comments
  • Loading branch information
maciejhirsz authored Jul 1, 2021
1 parent f705e32 commit 095db9b
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 9 deletions.
99 changes: 92 additions & 7 deletions types/src/v2/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,90 @@ impl<'a> RpcParams<'a> {
Self(raw)
}

/// Attempt to parse all parameters as array or map into type T
fn next_inner<T>(&mut self) -> Option<Result<T, CallError>>
where
T: Deserialize<'a>,
{
let mut json = self.0?.trim_start();

match json.as_bytes().get(0)? {
b']' => {
self.0 = None;

return None;
}
b'[' | b',' => json = &json[1..],
_ => return Some(Err(CallError::InvalidParams)),
}

let mut iter = serde_json::Deserializer::from_str(json).into_iter::<T>();

match iter.next()? {
Ok(value) => {
self.0 = Some(&json[iter.byte_offset()..]);

Some(Ok(value))
}
Err(_) => {
self.0 = None;

Some(Err(CallError::InvalidParams))
}
}
}

/// Parse the next parameter to type `T`
///
/// ```
/// # use jsonrpsee_types::v2::params::RpcParams;
/// let mut params = RpcParams::new(Some(r#"[true, 10, "foo"]"#));
///
/// let a: bool = params.next().unwrap();
/// let b: i32 = params.next().unwrap();
/// let c: &str = params.next().unwrap();
///
/// assert_eq!(a, true);
/// assert_eq!(b, 10);
/// assert_eq!(c, "foo");
/// ```
pub fn next<T>(&mut self) -> Result<T, CallError>
where
T: Deserialize<'a>,
{
match self.next_inner() {
Some(result) => result,
None => Err(CallError::InvalidParams),
}
}

/// Parse the next optional parameter to type `Option<T>`.
///
/// The result will be `None` for `null`, and for missing values in the supplied JSON array.
///
/// ```
/// # use jsonrpsee_types::v2::params::RpcParams;
/// let mut params = RpcParams::new(Some(r#"[1, 2, null]"#));
///
/// let params: [Option<u32>; 4] = [
/// params.optional_next().unwrap(),
/// params.optional_next().unwrap(),
/// params.optional_next().unwrap(),
/// params.optional_next().unwrap(),
/// ];;
///
/// assert_eq!(params, [Some(1), Some(2), None, None]);
/// ```
pub fn optional_next<T>(&mut self) -> Result<Option<T>, CallError>
where
T: Deserialize<'a>,
{
match self.next_inner::<Option<T>>() {
Some(result) => result,
None => Ok(None),
}
}

/// Attempt to parse all parameters as array or map into type `T`
pub fn parse<T>(self) -> Result<T, CallError>
where
T: Deserialize<'a>,
Expand All @@ -77,7 +160,7 @@ impl<'a> RpcParams<'a> {
serde_json::from_str(params).map_err(|_| CallError::InvalidParams)
}

/// Attempt to parse only the first parameter from an array into type T
/// Attempt to parse parameters as an array of a single value of type `T`, and returns that value.
pub fn one<T>(self) -> Result<T, CallError>
where
T: Deserialize<'a>,
Expand Down Expand Up @@ -288,15 +371,17 @@ mod test {

#[test]
fn params_parse() {
let none = RpcParams::new(None);
assert!(none.one::<u64>().is_err());
let mut none = RpcParams::new(None);
assert!(none.next::<u64>().is_err());

let array_params = RpcParams::new(Some("[1, 2, 3]"));
let mut array_params = RpcParams::new(Some("[1, 2, 3]"));
let arr: Result<[u64; 3], _> = array_params.parse();
assert!(arr.is_ok());

let arr: Result<(u64, u64, u64), _> = array_params.parse();
assert!(arr.is_ok());
assert_eq!(array_params.next::<u64>().unwrap(), 1);
assert_eq!(array_params.next::<u64>().unwrap(), 2);
assert_eq!(array_params.next::<u64>().unwrap(), 3);
assert!(array_params.next::<u64>().is_err());

let array_one = RpcParams::new(Some("[1]"));
let one: Result<u64, _> = array_one.one();
Expand Down
4 changes: 2 additions & 2 deletions utils/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// use jsonrpsee_utils::server::rpc_module::RpcModule;
///
/// let mut ctx = RpcModule::new(99_usize);
/// ctx.register_subscription("sub", "unsub", |params, mut sink, ctx| {
/// let x: usize = params.one()?;
/// ctx.register_subscription("sub", "unsub", |mut params, mut sink, ctx| {
/// let x: usize = params.next()?;
/// std::thread::spawn(move || {
/// let sum = x + (*ctx);
/// sink.send(&sum)
Expand Down

0 comments on commit 095db9b

Please sign in to comment.