Skip to content

Commit

Permalink
Merge pull request #9 from akasamq/remove-glommio-rt
Browse files Browse the repository at this point in the history
refactor: remove glommio runtime
  • Loading branch information
TheWaWaR authored Nov 18, 2023
2 parents 54cd466 + fa35930 commit b29749a
Show file tree
Hide file tree
Showing 24 changed files with 130 additions and 1,009 deletions.
4 changes: 1 addition & 3 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@

Akasa 是一个 Rust 写的高性能,低延迟,高度可扩展的 MQTT 服务器。

Akasa 用 [glommio][glommio] 来实现高性能低延迟的网络 IO. 它底层的 MQTT 协议消息编解码器 ([mqtt-proto][mqtt-proto]) 是为了高性能和 async 环境而精心设计实现的。
它底层的 MQTT 协议消息编解码器 ([mqtt-proto][mqtt-proto]) 是为了高性能和 async 环境而精心设计实现的。

## 特性
- [x] 完全支持 MQTT v3.1/v3.1.1/v5.0
- [x] 支持 TLS (包括双向认证)
- [x] 支持 WebSocket (包括 TLS 支持)
- [x] 支持 [Proxy Protocol V2][proxy-protocol]
- [x] 使用 `io_uring` ([glommio][glommio]) 来实现高性能低延迟 IO (非 Linux 环境可以用 tokio)
- [x] 使用 [Hook trait][hook-trait] 来扩展服务器
- [x] 用一个密码文件来支持简单的认证
- [ ] 基于 Raft 的服务器集群 (*敬请期待*)
Expand Down Expand Up @@ -95,7 +94,6 @@ Akasa 会有一个企业版本,企业版中的额外功能包括:
[mqtt-proto]: https://github.com/akasamq/mqtt-proto
[mqtt-proto-fuzz]: https://github.com/akasamq/mqtt-proto/tree/master/fuzz
[proxy-protocol]: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
[glommio]: https://github.com/DataDog/glommio
[bsl]: https://mariadb.com/bsl-faq-mariadb/
[hook-trait]: https://github.com/akasamq/akasa/blob/5ade2d788d9a919671f81b01d720155caf8e4e2d/akasa-core/src/hook.rs#L43
[tensorflow]: https://blog.tensorflow.org/2020/09/supercharging-tensorflowjs-webassembly.html
Expand Down
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ English | [简体中文](README-CN.md)

Akasa is a high performance, low latency and high extendable MQTT server in Rust.

It uses [glommio][glommio] for high performance and low latency network IO. The underlying MQTT protocol message codec ([mqtt-proto][mqtt-proto]) is carefully crafted for high performance and async environment.
The underlying MQTT protocol message codec ([mqtt-proto][mqtt-proto]) is carefully crafted for high performance and async environment.

## Features
- [x] Full support MQTT v3.1/v3.1.1/v5.0
- [x] Support TLS (include two-way authentication)
- [x] Support WebSocket (include TLS support)
- [x] Support [Proxy Protocol V2][proxy-protocol]
- [x] Use `io_uring` ([glommio][glommio]) for high performance low latency IO (can use tokio on non-Linux OS)
- [x] Use a [Hook trait][hook-trait] to extend the server
- [x] Simple password file based authentication
- [ ] Raft based cluster (*coming soon*)
Expand Down Expand Up @@ -100,7 +99,6 @@ Akasa will have an enterprise edition. In this edition, it provides:
[mqtt-proto]: https://github.com/akasamq/mqtt-proto
[mqtt-proto-fuzz]: https://github.com/akasamq/mqtt-proto/tree/master/fuzz
[proxy-protocol]: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
[glommio]: https://github.com/DataDog/glommio
[bsl]: https://mariadb.com/bsl-faq-mariadb/
[hook-trait]: https://github.com/akasamq/akasa/blob/5ade2d788d9a919671f81b01d720155caf8e4e2d/akasa-core/src/hook.rs#L43
[tensorflow]: https://blog.tensorflow.org/2020/09/supercharging-tensorflowjs-webassembly.html
Expand Down
6 changes: 2 additions & 4 deletions akasa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ parking_lot = "0.12.1"
serde = { version = "1.0.147", features = ["derive"] }
thiserror = "1.0.38"
tokio = { version = "1.23.0", features = ["full"] }
tokio-tungstenite = "0.20.1"
tokio-openssl = "0.6.3"
uuid = { version = "1.2.2", features = ["v4"] }
rand = { version = "0.8.5", features = ["getrandom"] }
ahash = "0.8.3"
Expand All @@ -35,10 +37,6 @@ base64 = "0.21.0"
ring = "0.16"
crc32c = "0.6.3"
openssl = "0.10.51"
async-tungstenite = "0.21.0"

[target.'cfg(target_os = "linux")'.dependencies]
glommio = { version = "0.8.0" }

[dev-dependencies]
futures-sink = "0.3.26"
Expand Down
17 changes: 9 additions & 8 deletions akasa-core/src/protocols/mqtt/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ use std::time::{Duration, Instant};

use parking_lot::RwLock;

use crate::state::{ClientId, ControlMessage, Executor, GlobalState};
use crate::state::{ClientId, ControlMessage, GlobalState};

pub(crate) fn start_keep_alive_timer<E: Executor>(
pub(crate) fn start_keep_alive_timer(
keep_alive: u16,
client_id: ClientId,
last_packet_time: &Arc<RwLock<Instant>>,
executor: &E,
global: &Arc<GlobalState>,
) -> io::Result<()> {
// FIXME: if kee_alive is zero, set a default keep_alive value from config
Expand All @@ -19,7 +18,7 @@ pub(crate) fn start_keep_alive_timer<E: Executor>(
log::debug!("{} keep alive: {:?}", client_id, half_interval * 2);
let last_packet_time = Arc::clone(last_packet_time);
let global = Arc::clone(global);
if let Err(err) = executor.spawn_interval(move || {
let action_gen = move || {
// Need clone twice: https://stackoverflow.com/a/68462908/1274372
let last_packet_time = Arc::clone(&last_packet_time);
let global = Arc::clone(&global);
Expand All @@ -45,10 +44,12 @@ pub(crate) fn start_keep_alive_timer<E: Executor>(
}
None
}
}) {
log::error!("spawn executor keep alive timer failed: {:?}", err);
return Err(err);
}
};
tokio::spawn(async move {
while let Some(duration) = action_gen().await {
tokio::time::sleep(duration).await;
}
});
}
Ok(())
}
6 changes: 2 additions & 4 deletions akasa-core/src/protocols/mqtt/online_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ use flume::{
r#async::{RecvStream, SendSink},
Sender,
};
use futures_lite::{
io::{AsyncRead, AsyncWrite},
Stream,
};
use futures_lite::Stream;
use futures_sink::Sink;
use hashbrown::HashMap;
use mqtt_proto::{v3, v5, GenericPollPacket, GenericPollPacketState, PollHeader, QoS, VarBytes};
use tokio::io::{AsyncRead, AsyncWrite};

use crate::hook::{handle_request, Hook, HookAction, HookRequest, HookResponse};
use crate::state::{ClientId, ClientReceiver, ControlMessage, GlobalState, NormalMessage};
Expand Down
40 changes: 10 additions & 30 deletions akasa-core/src/protocols/mqtt/v3/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use std::net::SocketAddr;
use std::sync::Arc;

use flume::{Receiver, Sender};
use futures_lite::{
io::{AsyncRead, AsyncWrite},
FutureExt,
};
use futures_lite::FutureExt;
use hashbrown::HashMap;
use mqtt_proto::{
v3::{
Expand All @@ -17,6 +14,7 @@ use mqtt_proto::{
},
Error, Pid, Protocol, QoS, QosPid,
};
use tokio::io::{AsyncRead, AsyncWrite};

use crate::hook::{
handle_request, Hook, HookAction, HookRequest, HookResponse, LockedHookContext, PublishAction,
Expand All @@ -25,9 +23,7 @@ use crate::hook::{
use crate::protocols::mqtt::{
BroadcastPackets, OnlineLoop, OnlineSession, PendingPackets, WritePacket,
};
use crate::state::{
ClientId, ClientReceiver, ControlMessage, Executor, GlobalState, NormalMessage,
};
use crate::state::{ClientId, ClientReceiver, ControlMessage, GlobalState, NormalMessage};

use super::{
packet::{
Expand All @@ -45,7 +41,6 @@ use super::{
#[allow(clippy::too_many_arguments)]
pub async fn handle_connection<
T: AsyncRead + AsyncWrite + Unpin,
E: Executor,
H: Hook + Clone + Send + Sync + 'static,
>(
conn: T,
Expand All @@ -54,7 +49,6 @@ pub async fn handle_connection<
protocol: Protocol,
timeout_receiver: Receiver<()>,
hook_handler: H,
executor: E,
global: Arc<GlobalState>,
) -> io::Result<()> {
match handle_online(
Expand All @@ -64,35 +58,31 @@ pub async fn handle_connection<
protocol,
timeout_receiver,
&hook_handler,
&executor,
&global,
)
.await
{
Ok(Some((session, receiver))) => {
log::info!(
"executor {:03}, {}({}) go to offline, total {} clients ({} online)",
executor.id(),
"{}({}) go to offline, total {} clients ({} online)",
session.client_id,
peer,
global.clients_count(),
global.online_clients_count(),
);
executor.spawn_local(handle_offline(session, receiver, global));
tokio::spawn(handle_offline(session, receiver, global));
}
Ok(None) => {
log::info!(
"executor {:03}, {} finished, total {} clients ({} online)",
executor.id(),
"{} finished, total {} clients ({} online)",
peer,
global.clients_count(),
global.online_clients_count(),
);
}
Err(err) => {
log::info!(
"executor {:03}, {} error: {}, total {} clients ({} online)",
executor.id(),
"{} error: {}, total {} clients ({} online)",
peer,
err,
global.clients_count(),
Expand All @@ -107,7 +97,6 @@ pub async fn handle_connection<
#[allow(clippy::too_many_arguments)]
async fn handle_online<
T: AsyncRead + AsyncWrite + Unpin,
E: Executor,
H: Hook + Clone + Send + Sync + 'static,
>(
mut conn: T,
Expand All @@ -116,7 +105,6 @@ async fn handle_online<
protocol: Protocol,
timeout_receiver: Receiver<()>,
hook_handler: &H,
executor: &E,
global: &Arc<GlobalState>,
) -> io::Result<Option<(Session, ClientReceiver)>> {
let mut session = Session::new(&global.config, peer);
Expand All @@ -143,15 +131,8 @@ async fn handle_online<
before_connect_hook(peer, &packet, hook_handler, global).await?;
}

let session_present = handle_connect(
&mut session,
&mut receiver,
packet,
&mut conn,
executor,
global,
)
.await?;
let session_present =
handle_connect(&mut session, &mut receiver, packet, &mut conn, global).await?;

if !session.connected {
log::info!("{} not connected", session.peer);
Expand All @@ -169,8 +150,7 @@ async fn handle_online<

let receiver = receiver.expect("receiver");
log::info!(
"executor {:03}, {} connected, total {} clients ({} online) ",
executor.id(),
"{} connected, total {} clients ({} online) ",
session.peer,
global.clients_count(),
global.online_clients_count(),
Expand Down
2 changes: 1 addition & 1 deletion akasa-core/src/protocols/mqtt/v3/packet/common.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::io;
use std::time::Instant;

use futures_lite::io::AsyncWrite;
use mqtt_proto::{
v3::{Packet, Publish},
QoS, QosPid,
};
use tokio::io::AsyncWrite;

use crate::protocols::mqtt::{get_unix_ts, PendingPacketStatus};
use crate::state::ClientId;
Expand Down
8 changes: 3 additions & 5 deletions akasa-core/src/protocols/mqtt/v3/packet/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,23 @@ use std::io;
use std::sync::Arc;
use std::time::Instant;

use futures_lite::io::AsyncWrite;
use mqtt_proto::{
v3::{Connack, Connect, ConnectReturnCode},
Protocol,
};
use tokio::io::AsyncWrite;

use crate::protocols::mqtt::{check_password, start_keep_alive_timer};
use crate::state::{AddClientReceipt, ClientReceiver, Executor, GlobalState};
use crate::state::{AddClientReceipt, ClientReceiver, GlobalState};

use super::super::Session;
use super::common::write_packet;

pub(crate) async fn handle_connect<T: AsyncWrite + Unpin, E: Executor>(
pub(crate) async fn handle_connect<T: AsyncWrite + Unpin>(
session: &mut Session,
receiver: &mut Option<ClientReceiver>,
packet: Connect,
conn: &mut T,
executor: &E,
global: &Arc<GlobalState>,
) -> io::Result<bool> {
log::debug!(
Expand Down Expand Up @@ -152,7 +151,6 @@ clean session : {}
session.keep_alive,
session.client_id,
&session.last_packet_time,
executor,
global,
)?;

Expand Down
Loading

0 comments on commit b29749a

Please sign in to comment.