Skip to content

Commit

Permalink
add informative printlns in streaming example
Browse files Browse the repository at this point in the history
  • Loading branch information
dwrensha committed Sep 21, 2024
1 parent 088612e commit b1e0b24
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 9 deletions.
1 change: 1 addition & 0 deletions capnp-rpc/examples/streaming/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ capnp = { path = "../../../capnp" }
futures = "0.3.0"
rand = "0.8.5"
sha2 = { version = "0.10.8" }
base16 = { version = "0.2" }
tokio = { version = "1.0.0", features = ["net", "rt", "macros"]}
tokio-util = { version = "0.7.4", features = ["compat"] }

Expand Down
21 changes: 13 additions & 8 deletions capnp-rpc/examples/streaming/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
return Ok(());
}

let stream_size: usize = str::parse(&args[3]).unwrap();
let window_size: usize = str::parse(&args[4]).unwrap();
let stream_size: u64 = str::parse(&args[3]).unwrap();
let window_size: u64 = str::parse(&args[4]).unwrap();

let addr = args[2]
.to_socket_addrs()?
Expand All @@ -36,7 +36,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
rpc_network.set_window_size(window_size);
rpc_network.set_window_size(window_size as usize);
let mut rpc_system = RpcSystem::new(rpc_network, None);
let receiver: receiver::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
tokio::task::spawn_local(rpc_system);
Expand All @@ -47,22 +47,27 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut rng = rand::thread_rng();
let mut hasher = Sha256::new();
let bytestream = pipeline.get_stream();
let mut bytes_written: u32 = 0;
let mut bytes_written: u64 = 0;
const CHUNK_SIZE: u32 = 4096;
while bytes_written < stream_size as u32 {
while bytes_written < stream_size {
let mut request = bytestream.write_request();
let body = request.get();
let buf = body.init_bytes(CHUNK_SIZE);
let this_chunk_size = u64::min(CHUNK_SIZE as u64, stream_size - bytes_written);
let buf = body.init_bytes(this_chunk_size as u32);
rng.fill(buf);
hasher.update(buf);
request.send().await?;
bytes_written += CHUNK_SIZE;
bytes_written += this_chunk_size;
}
let local_sha256 = hasher.finalize();
println!(
"wrote {bytes_written} bytes with hash {}",
base16::encode_lower(&local_sha256[..])
);
bytestream.end_request().send().promise.await?;
let response = promise.await?;

let sha256 = response.get()?.get_sha256()?;
let local_sha256 = hasher.finalize();
assert_eq!(sha256, &local_sha256[..]);
Ok(())
})
Expand Down
11 changes: 10 additions & 1 deletion capnp-rpc/examples/streaming/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ use sha2::{Digest, Sha256};

struct ByteStreamImpl {
hasher: Sha256,
bytes_received: u32,
hash_sender: Option<oneshot::Sender<Vec<u8>>>,
}

impl ByteStreamImpl {
fn new(hash_sender: oneshot::Sender<Vec<u8>>) -> Self {
Self {
hasher: Sha256::new(),
bytes_received: 0,
hash_sender: Some(hash_sender),
}
}
Expand All @@ -28,6 +30,7 @@ impl byte_stream::Server for ByteStreamImpl {
fn write(&mut self, params: byte_stream::WriteParams) -> Promise<(), Error> {
let bytes = pry!(pry!(params.get()).get_bytes());
self.hasher.update(bytes);
self.bytes_received += bytes.len() as u32;
Promise::ok(())
}

Expand All @@ -37,8 +40,14 @@ impl byte_stream::Server for ByteStreamImpl {
_results: byte_stream::EndResults,
) -> Promise<(), Error> {
let hasher = std::mem::take(&mut self.hasher);
let hash = hasher.finalize()[..].to_vec();
println!(
"received {} bytes with hash {}",
self.bytes_received,
base16::encode_lower(&hash[..])
);
if let Some(sender) = self.hash_sender.take() {
let _ = sender.send(hasher.finalize()[..].to_vec());
let _ = sender.send(hash);
}
Promise::ok(())
}
Expand Down

0 comments on commit b1e0b24

Please sign in to comment.