Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporarily remove RPC query parsing #858

Merged
merged 1 commit into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ websocket-client = [
bytes = "1.0"
chrono = "0.4"
getrandom = "0.1"
# TODO(thane): Use a released version once support for inverted patterns is released.
# See https://github.com/kevinmehall/rust-peg/pull/245
peg = { git = "https://github.com/kevinmehall/rust-peg.git", rev = "ba6019539b2cf80289190cbb9537c94113b6b7d1" }
pin-project = "1.0.1"
serde = { version = "1", features = [ "derive" ] }
serde_bytes = "0.11"
Expand Down
120 changes: 3 additions & 117 deletions rpc/src/client/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
//! CLI for performing simple interactions against a Tendermint node's RPC.

use futures::StreamExt;
use std::str::FromStr;
use std::time::Duration;
use structopt::StructOpt;
use tendermint::abci::{Path, Transaction};
use tendermint_rpc::query::Query;
use tendermint_rpc::{
Client, Error, HttpClient, Order, Paging, Result, Scheme, Subscription, SubscriptionClient,
Url, WebSocketClient,
Client, Error, HttpClient, Paging, Result, Scheme, SubscriptionClient, Url, WebSocketClient,
};
use tracing::level_filters::LevelFilter;
use tracing::{error, info, warn};
Expand Down Expand Up @@ -45,18 +41,7 @@ struct Opt {
enum Request {
#[structopt(flatten)]
ClientRequest(ClientRequest),
/// Subscribe to receive events produced by a specific query.
Subscribe {
/// The query against which events will be matched.
query: Query,
/// The maximum number of events to receive before terminating.
#[structopt(long)]
max_events: Option<u32>,
/// The maximum amount of time (in seconds) to listen for events before
/// terminating.
#[structopt(long)]
max_time: Option<u32>,
},
// TODO(thane): Implement subscription functionality
}

#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -127,19 +112,7 @@ enum ClientRequest {
NetInfo,
/// Get Tendermint status (node info, public key, latest block hash, etc.).
Status,
/// Search for transactions with their results.
TxSearch {
/// The query against which transactions should be matched.
query: Query,
#[structopt(long, default_value = "1")]
page: u32,
#[structopt(long, default_value = "10")]
per_page: u8,
#[structopt(long, default_value = "asc")]
order: Order,
#[structopt(long)]
prove: bool,
},
// TODO(thane): Implement txsearch endpoint.
/// Get the validators at the given height.
Validators {
/// The height at which to query the validators.
Expand Down Expand Up @@ -235,7 +208,6 @@ async fn http_request(url: Url, proxy_url: Option<Url>, req: Request) -> Result<

match req {
Request::ClientRequest(r) => client_request(&client, r).await,
_ => Err(Error::invalid_params("HTTP/S clients do not support subscription capabilities (please use the WebSocket client instead)"))
}
}

Expand All @@ -246,11 +218,6 @@ async fn websocket_request(url: Url, req: Request) -> Result<()> {

let result = match req {
Request::ClientRequest(r) => client_request(&client, r).await,
Request::Subscribe {
query,
max_events,
max_time,
} => subscription_client_request(&client, query, max_events, max_time).await,
};

client.close()?;
Expand Down Expand Up @@ -322,17 +289,6 @@ where
ClientRequest::Health => serde_json::to_string_pretty(&client.health().await?)?,
ClientRequest::NetInfo => serde_json::to_string_pretty(&client.net_info().await?)?,
ClientRequest::Status => serde_json::to_string_pretty(&client.status().await?)?,
ClientRequest::TxSearch {
query,
page,
per_page,
order,
prove,
} => serde_json::to_string_pretty(
&client
.tx_search(query, prove, page, per_page, order)
.await?,
)?,
ClientRequest::Validators {
height,
all,
Expand All @@ -356,73 +312,3 @@ where
println!("{}", result);
Ok(())
}

async fn subscription_client_request<C>(
client: &C,
query: Query,
max_events: Option<u32>,
max_time: Option<u32>,
) -> Result<()>
where
C: SubscriptionClient,
{
info!("Creating subscription for query: {}", query);
let subs = client.subscribe(query).await?;
match max_time {
Some(secs) => recv_events_with_timeout(subs, max_events, secs).await,
None => recv_events(subs, max_events).await,
}
}

async fn recv_events_with_timeout(
mut subs: Subscription,
max_events: Option<u32>,
timeout_secs: u32,
) -> Result<()> {
let timeout = tokio::time::sleep(Duration::from_secs(timeout_secs as u64));
let mut event_count = 0u64;
tokio::pin!(timeout);
loop {
tokio::select! {
result_opt = subs.next() => {
let result = match result_opt {
Some(r) => r,
None => {
info!("The server terminated the subscription");
return Ok(());
}
};
let event = result?;
println!("{}", serde_json::to_string_pretty(&event)?);
event_count += 1;
if let Some(me) = max_events {
if event_count >= (me as u64) {
info!("Reached maximum number of events: {}", me);
return Ok(());
}
}
}
_ = &mut timeout => {
info!("Reached event receive timeout of {} seconds", timeout_secs);
return Ok(())
}
}
}
}

async fn recv_events(mut subs: Subscription, max_events: Option<u32>) -> Result<()> {
let mut event_count = 0u64;
while let Some(result) = subs.next().await {
let event = result?;
println!("{}", serde_json::to_string_pretty(&event)?);
event_count += 1;
if let Some(me) = max_events {
if event_count >= (me as u64) {
info!("Reached maximum number of events: {}", me);
return Ok(());
}
}
}
info!("The server terminated the subscription");
Ok(())
}
Loading