Skip to content

Commit

Permalink
feat: remove config from socket (#331)
Browse files Browse the repository at this point in the history
* feat(socketio/extensions): use `RwLock<HashMap>` rather than `DashMap`

* chore(bench): add bencher ci

* fix: socketioxide benches with `Bytes`

* chore(bench): fix ci name

* chore(bench): add RUSTFLAG for testing

* fix: engineioxide benches

* chore(bench): remove matrix test

* chore(bench): add groups

* chore(bench): improve extensions bench

* feat(socketio/extract): refactor extract mod

* feat(socketio/extract): add `(Maybe)(Http)Extension` extractors

* docs(example): update examples with `Extension` extractor

* test(socketio/extract): add tests for `Extension` and `MaybeExtension`

* docs(example) fmt chat example

* test(socketio): fix extractors test

* doc(socketio): improve doc for socketioxide

* test(socketio): increase timeout

* doc(socketio): improve doc

* feat(io): store io client in socketdata so it is possible to retrieve it anywhere

* feat(state): per client state

* doc(socketio): improve doc

* test: add state for client init

* test

* doc(examples): fix examples

* doc(examples): fix examples

* doc(examples): fix loco example

* doc(examples): fix state example

* test: revert excessive timeout due to previous state collision

* feat(socket): remove config form socket
  • Loading branch information
Totodore authored Jun 7, 2024
1 parent 3bb916e commit af25013
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 33 deletions.
3 changes: 2 additions & 1 deletion socketioxide/src/ack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ impl AckInnerStream {
return AckInnerStream::Stream { rxs };
}

let duration = duration.unwrap_or_else(|| sockets.first().unwrap().config.ack_timeout);
let duration =
duration.unwrap_or_else(|| sockets.first().unwrap().get_io().config().ack_timeout);
for socket in sockets {
let rx = socket.send_with_ack(packet.clone());
rxs.push(AckResultWithId {
Expand Down
13 changes: 4 additions & 9 deletions socketioxide/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ use crate::{
use crate::{ProtocolVersion, SocketIo};

pub struct Client<A: Adapter> {
pub(crate) config: Arc<SocketIoConfig>,
pub(crate) config: SocketIoConfig,
ns: RwLock<HashMap<Cow<'static, str>, Arc<Namespace<A>>>>,
#[cfg(feature = "state")]
pub(crate) state: state::TypeMap![Send + Sync],
}

impl<A: Adapter> Client<A> {
pub fn new(
config: Arc<SocketIoConfig>,
config: SocketIoConfig,
#[cfg(feature = "state")] mut state: state::TypeMap![Send + Sync],
) -> Self {
#[cfg(feature = "state")]
Expand All @@ -57,13 +57,8 @@ impl<A: Adapter> Client<A> {

if let Some(ns) = self.get_ns(ns_path) {
let esocket = esocket.clone();
let config = self.config.clone();
tokio::spawn(async move {
if ns
.connect(esocket.id, esocket.clone(), auth, config)
.await
.is_ok()
{
if ns.connect(esocket.id, esocket.clone(), auth).await.is_ok() {
// cancel the connect timeout task for v5
if let Some(tx) = esocket.data.connect_recv_tx.lock().unwrap().take() {
tx.send(()).ok();
Expand Down Expand Up @@ -398,7 +393,7 @@ mod test {
..Default::default()
};
let client = Client::<LocalAdapter>::new(
std::sync::Arc::new(config),
config,
#[cfg(feature = "state")]
Default::default(),
);
Expand Down
9 changes: 4 additions & 5 deletions socketioxide/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl<A: Adapter> SocketIoBuilder<A> {
self.config.engine_config = self.engine_config_builder.build();

let (layer, client) = SocketIoLayer::from_config(
Arc::new(self.config),
self.config,
#[cfg(feature = "state")]
self.state,
);
Expand All @@ -203,7 +203,7 @@ impl<A: Adapter> SocketIoBuilder<A> {

let (svc, client) = SocketIoService::with_config_inner(
NotFoundService,
Arc::new(self.config),
self.config,
#[cfg(feature = "state")]
self.state,
);
Expand All @@ -218,7 +218,7 @@ impl<A: Adapter> SocketIoBuilder<A> {

let (svc, client) = SocketIoService::with_config_inner(
svc,
Arc::new(self.config),
self.config,
#[cfg(feature = "state")]
self.state,
);
Expand Down Expand Up @@ -895,10 +895,9 @@ mod tests {
let (_, io) = SocketIo::builder().build_svc();
io.ns("/", || {});
let socket = Socket::new_dummy(sid, Box::new(|_, _| {}));
let config = SocketIoConfig::default().into();
io.0.get_ns("/")
.unwrap()
.connect(sid, socket, None, config)
.connect(sid, socket, None)
.await
.ok();

Expand Down
4 changes: 2 additions & 2 deletions socketioxide/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ impl<A: Adapter> Clone for SocketIoLayer<A> {

impl<A: Adapter> SocketIoLayer<A> {
pub(crate) fn from_config(
config: Arc<SocketIoConfig>,
config: SocketIoConfig,
#[cfg(feature = "state")] state: state::TypeMap![Send + Sync],
) -> (Self, Arc<Client<A>>) {
let client = Arc::new(Client::new(
config.clone(),
config,
#[cfg(feature = "state")]
state,
));
Expand Down
4 changes: 1 addition & 3 deletions socketioxide/src/ns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::{
handler::{BoxedConnectHandler, ConnectHandler, MakeErasedHandler},
packet::{Packet, PacketData},
socket::{DisconnectReason, Socket},
SocketIoConfig,
};
use crate::{client::SocketData, errors::AdapterError};
use engineioxide::sid::Sid;
Expand Down Expand Up @@ -47,9 +46,8 @@ impl<A: Adapter> Namespace<A> {
sid: Sid,
esocket: Arc<engineioxide::Socket<SocketData<A>>>,
auth: Option<String>,
config: Arc<SocketIoConfig>,
) -> Result<(), ConnectFail> {
let socket: Arc<Socket<A>> = Socket::new(sid, self.clone(), esocket.clone(), config).into();
let socket: Arc<Socket<A>> = Socket::new(sid, self.clone(), esocket.clone()).into();

if let Err(e) = self.handler.call_middleware(socket.clone(), &auth).await {
#[cfg(feature = "tracing")]
Expand Down
4 changes: 3 additions & 1 deletion socketioxide/src/operators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,9 @@ impl<A: Adapter> ConfOperators<'_, A> {
return Err(e.with_value(data).into());
}
};
let timeout = self.timeout.unwrap_or(self.socket.config.ack_timeout);
let timeout = self
.timeout
.unwrap_or_else(|| self.socket.get_io().config().ack_timeout);
let packet = self.get_packet(event, data)?;
let rx = self.socket.send_with_ack_permit(packet, permit);
let stream = AckInnerStream::send(rx, timeout, self.socket.id);
Expand Down
2 changes: 1 addition & 1 deletion socketioxide/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl<A: Adapter, S: Clone> SocketIoService<S, A> {
/// Creates a new [`EngineIoService`] with a custom inner service and a custom config.
pub(crate) fn with_config_inner(
inner: S,
config: Arc<SocketIoConfig>,
config: SocketIoConfig,
#[cfg(feature = "state")] state: state::TypeMap![Send + Sync],
) -> (Self, Arc<Client<A>>) {
let engine_config = config.engine_config.clone();
Expand Down
23 changes: 12 additions & 11 deletions socketioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
ns::Namespace,
operators::{BroadcastOperators, ConfOperators, RoomParam},
packet::{BinaryPacket, Packet, PacketData},
AckError, SocketIo, SocketIoConfig,
AckError, SocketIo,
};
use crate::{
client::SocketData,
Expand Down Expand Up @@ -128,7 +128,6 @@ impl<'a> PermitExt<'a> for Permit<'a> {
/// It is used to send and receive messages from the client, join and leave rooms, etc.
/// The socket struct itself should not be used directly, but through a [`SocketRef`](crate::extract::SocketRef).
pub struct Socket<A: Adapter = LocalAdapter> {
pub(crate) config: Arc<SocketIoConfig>,
pub(crate) ns: Arc<Namespace<A>>,
message_handlers: RwLock<HashMap<Cow<'static, str>, BoxedMessageHandler<A>>>,
disconnect_handler: Mutex<Option<BoxedDisconnectHandler<A>>>,
Expand All @@ -154,7 +153,6 @@ impl<A: Adapter> Socket<A> {
sid: Sid,
ns: Arc<Namespace<A>>,
esocket: Arc<engineioxide::Socket<SocketData<A>>>,
config: Arc<SocketIoConfig>,
) -> Self {
Self {
ns,
Expand All @@ -166,7 +164,6 @@ impl<A: Adapter> Socket<A> {
id: sid,
#[cfg(feature = "extensions")]
extensions: Extensions::new(),
config,
esocket,
}
}
Expand Down Expand Up @@ -398,7 +395,7 @@ impl<A: Adapter> Socket<A> {
let data = serde_json::to_value(data)?;
let packet = Packet::event(self.ns(), event.into(), data);
let rx = self.send_with_ack_permit(packet, permit);
let stream = AckInnerStream::send(rx, self.config.ack_timeout, self.id);
let stream = AckInnerStream::send(rx, self.get_io().config().ack_timeout, self.id);
Ok(AckStream::<V>::from(stream))
}

Expand Down Expand Up @@ -827,13 +824,17 @@ impl<A: Adapter> PartialEq for Socket<A> {
impl<A: Adapter> Socket<A> {
/// Creates a dummy socket for testing purposes
pub fn new_dummy(sid: Sid, ns: Arc<Namespace<A>>) -> Socket<A> {
use crate::client::Client;
use crate::io::SocketIoConfig;

let close_fn = Box::new(move |_, _| ());
let s = Socket::new(
sid,
ns,
engineioxide::Socket::new_dummy(sid, close_fn),
Arc::new(SocketIoConfig::default()),
);
let config = SocketIoConfig::default();
let io = SocketIo::from(Arc::new(Client::<A>::new(
config,
std::default::Default::default(),
)));
let s = Socket::new(sid, ns, engineioxide::Socket::new_dummy(sid, close_fn));
s.esocket.data.io.set(io).unwrap();
s.set_connected(true);
s
}
Expand Down

0 comments on commit af25013

Please sign in to comment.