-
Notifications
You must be signed in to change notification settings - Fork 11
/
rpc_callee.rs
122 lines (99 loc) · 3.54 KB
/
rpc_callee.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use std::error::Error;
use std::sync::atomic::{AtomicU64, Ordering};
use lazy_static::*;
use wamp_async::{
Client, ClientConfig, ClientState, SerializerType, WampArgs, WampError, WampKwArgs,
};
lazy_static! {
static ref RPC_CALL_COUNT: AtomicU64 = AtomicU64::new(0);
}
// Simply return the rpc arguments
async fn echo(
args: Option<WampArgs>,
kwargs: Option<WampKwArgs>,
) -> Result<(Option<WampArgs>, Option<WampKwArgs>), WampError> {
println!("peer.echo {:?} {:?}", args, kwargs);
let _ = RPC_CALL_COUNT.fetch_add(1, Ordering::Relaxed);
Ok((args, kwargs))
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct MyArgs(i64, i64);
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct MyKwArgs {
name: String,
age: u8,
}
// Validate structure and return the rpc arguments
async fn strict_echo(
args: Option<WampArgs>,
kwargs: Option<WampKwArgs>,
) -> Result<(Option<WampArgs>, Option<WampKwArgs>), WampError> {
println!("peer.strict_echo raw input {:?} {:?}", args, kwargs);
let valid_args: Option<MyArgs> = args.map(wamp_async::try_from_args).transpose()?;
let valid_kwargs: Option<MyKwArgs> = kwargs.map(wamp_async::try_from_kwargs).transpose()?;
println!(
"peer.strict_echo deserialized input {:?} {:?}",
valid_args, valid_kwargs
);
let _ = RPC_CALL_COUNT.fetch_add(1, Ordering::Relaxed);
Ok((
valid_args.map(wamp_async::try_into_args).transpose()?,
valid_kwargs.map(wamp_async::try_into_kwargs).transpose()?,
))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
// Connect to the server
let (mut client, (evt_loop, rpc_evt_queue)) = Client::connect(
"wss://localhost:8080/ws",
Some(
ClientConfig::default()
// Allow invalid/self signed certs
.set_ssl_verify(false)
// Use MsgPack first or fallback to Json
.set_serializers(vec![SerializerType::MsgPack, SerializerType::Json]),
),
)
.await?;
println!("Connected !!");
// Spawn the event loop
tokio::spawn(evt_loop);
// Handle RPC events in separate tasks
tokio::spawn(async move {
let mut rpc_event_queue = rpc_evt_queue.unwrap();
loop {
// Wait for an RPC call
let rpc_event = match rpc_event_queue.recv().await {
Some(e) => e,
None => break,
};
// Execute the function call
tokio::spawn(rpc_event);
}
});
println!("Joining realm");
client.join_realm("realm1").await?;
// Register our functions to a uri
let echo_rpc_id = client.register("peer.echo", echo).await?;
let strict_echo_rpc_id = client.register("peer.strict_echo", strict_echo).await?;
println!("Waiting for 'peer.echo' to be called at least 4 times");
loop {
let call_num = RPC_CALL_COUNT.load(Ordering::Relaxed);
if call_num >= 4 || !client.is_connected() {
break;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
// Client should not have disconnected
if let ClientState::Disconnected(Err(e)) = client.get_cur_status() {
println!("Client disconnected because of : {:?}", e);
return Err(From::from("Unexpected disconnect".to_string()));
}
client.unregister(echo_rpc_id).await?;
client.unregister(strict_echo_rpc_id).await?;
println!("Leaving realm");
client.leave_realm().await?;
client.disconnect().await;
Ok(())
}