Skip to content

Commit

Permalink
feat: websocket implementation (#214)
Browse files Browse the repository at this point in the history
* feat: websocket implementation

* chore: split up http platforms

* feat: support binary data from websockets

* fix: handle multiple messages on a connection

* chore: update lockfile
  • Loading branch information
elcharitas authored Dec 14, 2024
1 parent 0af3403 commit 3134da9
Show file tree
Hide file tree
Showing 12 changed files with 2,106 additions and 263 deletions.
1,124 changes: 973 additions & 151 deletions Cargo.lock

Large diffs are not rendered by default.

1,020 changes: 934 additions & 86 deletions crates/Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ pub mod prelude {
pub use crate::macros::*;
pub use ngyn_hyper::HyperApplication;
pub use ngyn_shared::{
core::{engine::NgynEngine, handler::*},
core::{
engine::{NgynEngine, NgynHttpEngine},
handler::*,
},
server::{
Body, JsonResponse, JsonResult, NgynContext, NgynRequest, NgynResponse, Param, Query,
ToBytes, Transducer,
Expand Down
4 changes: 2 additions & 2 deletions crates/hyper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::{service::service_fn, Request};
use hyper_util::rt::TokioIo;
use ngyn_shared::core::engine::{NgynPlatform, PlatformData};
use ngyn_shared::core::engine::{NgynHttpPlatform, PlatformData};
use std::sync::Arc;
use tokio::net::TcpListener;

Expand All @@ -13,7 +13,7 @@ pub struct HyperApplication {
data: PlatformData,
}

impl NgynPlatform for HyperApplication {
impl NgynHttpPlatform for HyperApplication {
fn data_mut(&mut self) -> &mut PlatformData {
&mut self.data
}
Expand Down
45 changes: 29 additions & 16 deletions crates/shared/src/core/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl PlatformData {
/// * `path` - The path of the route.
/// * `method` - The HTTP method of the route.
/// * `handler` - The handler function for the route.
pub(self) fn add_route(&mut self, path: &str, method: Option<Method>, handler: RouteHandler) {
pub fn add_route(&mut self, path: &str, method: Option<Method>, handler: RouteHandler) {
let method = method
.map(|method| method.to_string())
.unwrap_or_else(|| "{METHOD}".to_string());
Expand Down Expand Up @@ -104,9 +104,11 @@ pub trait NgynPlatform: Default {
fn data_mut(&mut self) -> &mut PlatformData;
}

impl<T: NgynPlatform> NgynEngine for T {}
pub trait NgynHttpPlatform: Default {
fn data_mut(&mut self) -> &mut PlatformData;
}

pub trait NgynEngine: NgynPlatform {
pub trait NgynHttpEngine: NgynEngine {
/// Adds a route to the application.
///
/// ### Arguments
Expand All @@ -130,10 +132,6 @@ pub trait NgynEngine: NgynPlatform {
.add_route(path, Some(method), handler.into());
}

fn any(&mut self, path: &str, handler: impl Into<RouteHandler>) {
self.data_mut().add_route(path, None, handler.into());
}

/// Adds a new route to the `NgynApplication` with the `Method::Get`.
fn get(&mut self, path: &str, handler: impl Into<RouteHandler>) {
self.route(path, Method::GET, handler.into())
Expand Down Expand Up @@ -164,15 +162,6 @@ pub trait NgynEngine: NgynPlatform {
self.route(path, Method::HEAD, handler.into())
}

/// Adds a middleware to the application.
///
/// ### Arguments
///
/// * `middleware` - The middleware to add.
fn use_middleware(&mut self, middleware: impl NgynMiddleware + 'static) {
self.data_mut().add_middleware(Box::new(middleware));
}

/// Sets up static file routes.
///
/// This is great for apps tha would want to output files in a specific folder.
Expand All @@ -199,6 +188,21 @@ pub trait NgynEngine: NgynPlatform {
}
Ok(())
}
}

pub trait NgynEngine: NgynPlatform {
fn any(&mut self, path: &str, handler: impl Into<RouteHandler>) {
self.data_mut().add_route(path, None, handler.into());
}

/// Adds a middleware to the application.
///
/// ### Arguments
///
/// * `middleware` - The middleware to add.
fn use_middleware(&mut self, middleware: impl NgynMiddleware + 'static) {
self.data_mut().add_middleware(Box::new(middleware));
}

/// Sets the state of the application to any value that implements [`AppState`].
///
Expand All @@ -210,6 +214,15 @@ pub trait NgynEngine: NgynPlatform {
}
}

impl<T: NgynHttpPlatform> NgynPlatform for T {
fn data_mut(&mut self) -> &mut PlatformData {
self.data_mut()
}
}

impl<T: NgynPlatform> NgynEngine for T {}
impl<T: NgynHttpPlatform> NgynHttpEngine for T {}

#[cfg(test)]
mod tests {
use http::StatusCode;
Expand Down
6 changes: 3 additions & 3 deletions crates/swagger/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use ngyn::shared::{
core::{
engine::{NgynEngine, PlatformData},
engine::{NgynHttpEngine, PlatformData},
handler::handler,
},
server::{NgynContext, NgynResponse},
Expand Down Expand Up @@ -147,7 +147,7 @@ pub fn build_specs_with_config(config: &SwaggerConfig, _platform: &mut PlatformD
})
}

pub trait NgynEngineSwagger: NgynEngine {
pub trait NgynEngineSwagger: NgynHttpEngine {
fn use_swagger(&mut self, config: SwaggerConfig) {
let template = include_str!("templates/swagger.html");
let docs_body = template
Expand All @@ -169,7 +169,7 @@ pub trait NgynEngineSwagger: NgynEngine {
}
}

impl<T: NgynEngine> NgynEngineSwagger for T {}
impl<T: NgynHttpEngine> NgynEngineSwagger for T {}

// fn merge(a: &mut Value, b: Value) {
// match (a, b) {
Expand Down
4 changes: 2 additions & 2 deletions crates/vercel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use ngyn_shared::{
core::engine::{NgynPlatform, PlatformData},
core::engine::{NgynHttpPlatform, PlatformData},
server::response::ReadBytes,
};
use vercel_runtime::{Body, Error, Request, Response as VercelResponse};
Expand All @@ -9,7 +9,7 @@ pub struct VercelApplication {
data: PlatformData,
}

impl NgynPlatform for VercelApplication {
impl NgynHttpPlatform for VercelApplication {
fn data_mut(&mut self) -> &mut PlatformData {
&mut self.data
}
Expand Down
18 changes: 18 additions & 0 deletions crates/ws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "ngyn-websocket"
version = "0.1.1"
edition = "2021"
description = "Websocket Runtime Platform for ngyn web framework"
license = "MIT"
documentation = "https://ngyn.rs/docs"
repository = "https://github.com/ngyn-rs/ngyn"
homepage = "https://ngyn.rs"
rust-version = "1.75"
keywords = ["ngyn", "run-time", "platform", "websockets", "framework"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ngyn_shared = { version = "0.4", path = "../shared" }
tokio = { version = "1", features = ["full"] }
websocket = "0.27.1"
119 changes: 119 additions & 0 deletions crates/ws/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use core::fmt;
use ngyn_shared::core::engine::{NgynPlatform, PlatformData};
use ngyn_shared::core::handler::RouteHandler;
use ngyn_shared::server::response::ReadBytes;
use ngyn_shared::server::NgynRequest;
use std::io::ErrorKind;
use std::net::ToSocketAddrs;
use std::sync::{Arc, Mutex};
use websocket::sync::Writer;
use websocket::Message;
use websocket::{sync::Server, OwnedMessage};

#[derive(Default)]
pub struct WebsocketApplication {
data: PlatformData,
clients: Arc<Mutex<Vec<Writer<std::net::TcpStream>>>>,
}

impl NgynPlatform for WebsocketApplication {
fn data_mut(&mut self) -> &mut PlatformData {
&mut self.data
}
}

impl WebsocketApplication {
/// add a route to handle
pub fn route(&mut self, path: &str, handler: impl Into<RouteHandler>) {
self.data_mut().add_route(path, None, handler.into());
}

// Broadcast message to all connected clients
pub fn broadcast(&self, message: &str) -> Result<(), websocket::WebSocketError> {
let mut clients = self
.clients
.lock()
.map_err(|_| websocket::WebSocketError::IoError(ErrorKind::InvalidData.into()))?;

for client in clients.iter_mut() {
client.send_message(&OwnedMessage::Text(message.to_string()))?;
}

Ok(())
}

/// Listens for incoming connections and serves the application.
///
/// ### Arguments
///
/// * `addr` - The address to listen on.
///
/// ### Returns
///
/// A `Result` indicating success or failure.
pub fn listen<A: ToSocketAddrs + fmt::Debug>(
self,
addr: A,
) -> Result<(), Box<dyn std::error::Error>> {
let server = Server::bind(addr)?;
let data_handler = Arc::new(self.data);

for request in server.filter_map(Result::ok) {
let path = request.uri();
let clients = Arc::clone(&self.clients);
let data_handler = data_handler.clone();

tokio::spawn(async move {
if let Ok(client) = request.accept() {
let (mut receiver, mut sender) = client.split().unwrap();
for message in receiver.incoming_messages() {
match message {
Ok(OwnedMessage::Text(_)) | Ok(OwnedMessage::Binary(_)) => {
// Infallible at this point, so we can safely call `unwrap`
let body = match message.unwrap() {
OwnedMessage::Binary(data) => data,
OwnedMessage::Text(data) => data.into(),
_ => return,
};
let mut req = NgynRequest::new(body);
// default to index url if parsing fails
*req.uri_mut() = path.parse().unwrap_or_default();

let mut response = data_handler.respond(req).await;

if let Ok(data) = response.read_bytes().await {
let message =
if response.headers().get("Content-Type").is_none() {
Message::text(String::from_utf8_lossy(&data))
} else {
Message::binary(data.to_vec())
};
sender.send_message(&message).unwrap();
}
}
Ok(OwnedMessage::Close(_)) => {
let message = Message::close();
sender.send_message(&message).unwrap();
break;
}
Ok(OwnedMessage::Ping(data)) => {
let message = Message::pong(data);
sender.send_message(&message).unwrap();
break;
}
Err(_) => break,
_ => {}
}
}

// Add client to the list of connected clients
if let Ok(mut client_list) = clients.lock() {
client_list.push(sender);
}
}
});
}

Ok(())
}
}
2 changes: 0 additions & 2 deletions examples/basic_app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,5 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = { version = "1.6.0", features = ["attributes"] }
ngyn = { version = "0.4.4", path = "../../crates/core" }
ngyn-hyper = { version = "0.1.0" }
tokio = { version = "1", features = ["full"] }
9 changes: 9 additions & 0 deletions examples/websocket/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "websocket"
version = "0.1.0"
edition = "2021"

[dependencies]
ngyn = { version = "0.4.4", path = "../../crates/core" }
ngyn-websocket = { version = "0.1.0", path = "../../crates/ws" }
tokio = { version = "1", features = ["full"] }
13 changes: 13 additions & 0 deletions examples/websocket/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use ngyn::prelude::*;
use ngyn_websocket::WebsocketApplication;

#[tokio::main]
async fn main() {
let mut app = WebsocketApplication::default();

app.any("/", handler(|_| "Hello"));

println!("Starting server at ws://127.0.0.1:8080");

let _ = app.listen("0.0.0.0:8080");
}

0 comments on commit 3134da9

Please sign in to comment.