Skip to content

Commit

Permalink
Merge pull request #17 from Totodore/ft-examples
Browse files Browse the repository at this point in the history
Examples + Refactoring
  • Loading branch information
Totodore authored Jun 17, 2023
2 parents bcb1f74 + ccd28cc commit 84eda00
Show file tree
Hide file tree
Showing 39 changed files with 2,168 additions and 735 deletions.
545 changes: 521 additions & 24 deletions Cargo.lock

Large diffs are not rendered by default.

27 changes: 12 additions & 15 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,23 @@ It takes full advantage of the [tower](https://docs.rs/tower/latest/tower/) and
* Handshake data
* Ack and emit with ack
* Binary
* Polling & Websocket transport
* Extensions on socket to add custom data to sockets

### Planned features :
* Improving the documentation
* Adding more tests & benchmars
* Other adapter to share state between server instances (like redis adapter), currently only the in memory adapter is implemented
* Better error handling
* Socket extensions to share state between sockets
* State recovery when a socket reconnects
* SocketIo v3 support (currently only v4 is supported)


### Socket.IO example echo implementation with Axum :
### Examples :
* [Chat app with Axum](./examples/src/chat)
* Echo implementation with Axum :
```rust
use axum::routing::get;
use axum::Server;
use serde::{Serialize, Deserialize};
use socketioxide::{Namespace, SocketIoLayer};
use tracing::info;
use tracing_subscriber::FmtSubscriber;
use serde_json::Value;

#[derive(Debug, Serialize, Deserialize)]
struct MyData {
Expand All @@ -44,31 +43,29 @@ struct MyData {

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let subscriber = FmtSubscriber::builder().finish();
tracing::subscriber::set_global_default(subscriber)?;

info!("Starting server");
println!("Starting server");

let ns = Namespace::builder()
.add("/", |socket| async move {
info!("Socket connected on / namespace with id: {}", socket.sid);
println!("Socket connected on / namespace with id: {}", socket.sid);

// Add a callback triggered when the socket receive an 'abc' event
// The json data will be deserialized to MyData
socket.on("abc", |socket, data: MyData, bin, _| async move {
info!("Received abc event: {:?} {:?}", data, bin);
println!("Received abc event: {:?} {:?}", data, bin);
socket.bin(bin).emit("abc", data).ok();
});

// Add a callback triggered when the socket receive an 'acb' event
// Ackknowledge the message with the ack callback
socket.on("acb", |_, data: Value, bin, ack| async move {
info!("Received acb event: {:?} {:?}", data, bin);
println!("Received acb event: {:?} {:?}", data, bin);
ack.bin(bin).send(data).ok();
});
})
.add("/custom", |socket| async move {
info!("Socket connected on /custom namespace with id: {}", socket.sid);
println!("Socket connected on /custom namespace with id: {}", socket.sid);
})
.build();

Expand Down
13 changes: 9 additions & 4 deletions engineioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@ async-trait = "0.1.66"
base64 = "0.21.0"
bytes = "1.4.0"
futures = "0.3.27"
futures-core = "0.3.27"
http = "0.2.9"
http-body = "0.4.5"
hyper = { version = "0.14.25", features = ["http1", "http2", "server", "stream", "runtime"] }
lazy_static = "1.4.0"
pin-project = "1.0.12"
rs-snowflake = "0.6.0"
serde = { version = "1.0.155", features = ["derive"] }
serde_json = "1.0.94"
thiserror = "1.0.40"
tokio = "1.26.0"
tokio-tungstenite = "0.19.0"
tower = "0.4.13"
tower-http = "0.4.0"
tracing = "0.1.37"
rand = "0.8.5"
base64id = { version = "0.3.1", features = ["std", "rand", "serde"] }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] }

[[bench]]
name = "benchmark_polling"
harness = false
8 changes: 4 additions & 4 deletions engineioxide/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ struct MyHandler;
impl EngineIoHandler for MyHandler {
type Data = ();

fn on_connect(self: Arc<Self>, socket: &Socket<Self>) {
fn on_connect(&self, socket: &Socket<Self>) {
println!("socket connect {}", socket.sid);
}
fn on_disconnect(self: Arc<Self>, socket: &Socket<Self>) {
fn on_disconnect(&self, socket: &Socket<Self>) {
println!("socket disconnect {}", socket.sid);
}

async fn on_message(self: Arc<Self>, msg: String, socket: &Socket<Self>) {
fn on_message(&self, msg: String, socket: &Socket<Self>) {
println!("Ping pong message {:?}", msg);
socket.emit(msg).ok();
}

async fn on_binary(self: Arc<Self>, data: Vec<u8>, socket: &Socket<Self>) {
fn on_binary(&self, data: Vec<u8>, socket: &Socket<Self>) {
println!("Ping pong binary message {:?}", data);
socket.emit_binary(data).ok();
}
Expand Down
162 changes: 162 additions & 0 deletions engineioxide/benches/benchmark_polling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use std::{sync::Arc, time::Duration};
use std::str::FromStr;

use bytes::{Buf, Bytes};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use engineioxide::{handler::EngineIoHandler, service::EngineIoService, socket::Socket};

use http::Request;
use http_body::{Empty, Full};
use serde::{Deserialize, Serialize};
use tower::Service;
use engineioxide::sid_generator::Sid;

/// An OpenPacket is used to initiate a connection
#[derive(Debug, Serialize, Deserialize, PartialEq, PartialOrd)]
#[serde(rename_all = "camelCase")]
struct OpenPacket {
sid: String,
upgrades: Vec<String>,
ping_interval: u64,
ping_timeout: u64,
max_payload: u64,
}

#[derive(Debug)]
struct Client;
impl EngineIoHandler for Client {
type Data = ();

fn on_connect(&self, _: &Socket<Self>) {}

fn on_disconnect(&self, _: &Socket<Self>) {}

fn on_message(&self, msg: String, socket: &Socket<Self>) {
socket.emit(msg).unwrap();
}

fn on_binary(&self, data: Vec<u8>, socket: &Socket<Self>) {
socket.emit_binary(data).unwrap();
}
}

fn create_open_poll_req() -> Request<http_body::Empty<Bytes>> {
http::Request::builder()
.method(http::Method::GET)
.uri("http://localhost:3000/engine.io/?EIO=4&transport=polling&t=NQ")
.body(Empty::new())
.unwrap()
}

async fn prepare_ping_pong(mut svc: EngineIoService<Client>) -> Sid {
let mut res = svc.call(create_open_poll_req()).await.unwrap();
let body = hyper::body::aggregate(res.body_mut()).await.unwrap();
let body: String = String::from_utf8(body.chunk().to_vec())
.unwrap()
.chars()
.skip(1)
.collect();
let open_packet: OpenPacket = serde_json::from_str(&body).unwrap();
let sid = open_packet.sid;
Sid::from_str(&sid).unwrap()
}
fn create_poll_req(sid: Sid) -> Request<http_body::Empty<Bytes>> {
http::Request::builder()
.method(http::Method::GET)
.uri(format!(
"http://localhost:3000/engine.io/?EIO=4&transport=polling&t=NQ&sid={sid}"
))
.body(Empty::new())
.unwrap()
}
fn create_post_req(sid: Sid) -> Request<http_body::Full<Bytes>> {
http::Request::builder()
.method(http::Method::POST)
.uri(format!(
"http://localhost:3000/engine.io/?EIO=4&transport=polling&t=NQ&sid={sid}"
))
.body(Full::new("4abcabc".to_owned().into()))
.unwrap()
}
fn create_post_bin_req(sid: Sid) -> Request<http_body::Full<Bytes>> {
http::Request::builder()
.method(http::Method::POST)
.uri(format!(
"http://localhost:3000/engine.io/?EIO=4&transport=polling&t=NQ&sid={sid}"
))
.body(Full::new("bYWJjYmFj".to_owned().into()))
.unwrap()
}
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("polling open request", |b| {
let client = Arc::new(Client);
let svc = EngineIoService::new(client);
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter_batched(
create_open_poll_req,
|req| svc.clone().call(black_box(req)),
criterion::BatchSize::SmallInput,
);
});

c.bench_function("ping/pong text", |b| {
let client = Arc::new(Client);
let svc = EngineIoService::new(client);
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter_custom(|r| {
let svc = svc.clone();
async move {
futures::future::join_all((0..r).map(|_| async {
let sid = prepare_ping_pong(svc.clone()).await;
let poll_req = create_poll_req(sid);
let post_req = create_post_req(sid);
let start = std::time::Instant::now();
let (a, b) = futures::future::join(
svc.clone().call(black_box(poll_req)),
svc.clone().call(black_box(post_req)),
)
.await;
a.unwrap();
b.unwrap();
start.elapsed()
}))
.await
.iter()
.sum::<Duration>()
/ r as u32
}
});
});

c.bench_function("ping/pong binary", |b| {
let client = Arc::new(Client);
let svc = EngineIoService::new(client);
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter_custom(|r| {
let svc = svc.clone();
async move {
futures::future::join_all((0..r).map(|_| async {
let sid = prepare_ping_pong(svc.clone()).await;
let poll_req = create_poll_req(sid);
let post_req = create_post_bin_req(sid);
let start = std::time::Instant::now();
let (a, b) = futures::future::join(
svc.clone().call(black_box(poll_req)),
svc.clone().call(black_box(post_req)),
)
.await;
a.unwrap();
b.unwrap();
start.elapsed()
}))
.await
.iter()
.sum::<Duration>()
/ r as u32
}
});
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
8 changes: 7 additions & 1 deletion engineioxide/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ impl<B> ResponseBody<B> {
}
}

impl<B> Default for ResponseBody<B> {
fn default() -> Self {
Self::empty_response()
}
}

#[pin_project(project = BodyProj)]
enum ResponseBodyInner<B> {
EmptyResponse,
Expand Down Expand Up @@ -58,7 +64,7 @@ where
match self.project().inner.project() {
BodyProj::EmptyResponse => std::task::Poll::Ready(None),
BodyProj::Body { body } => body.poll_data(cx),
BodyProj::CustomBody { body } => body.poll_data(cx).map_err(|err| match err {})
BodyProj::CustomBody { body } => body.poll_data(cx).map_err(|err| match err {}),
}
}

Expand Down
Loading

0 comments on commit 84eda00

Please sign in to comment.