Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Signaling Socket.io JS into Socketioxide Rust #347

Closed
aryabp opened this issue Jul 12, 2024 · 4 comments · Fixed by #353
Closed

Migrate Signaling Socket.io JS into Socketioxide Rust #347

aryabp opened this issue Jul 12, 2024 · 4 comments · Fixed by #353
Assignees
Labels
bug Something isn't working

Comments

@aryabp
Copy link

aryabp commented Jul 12, 2024

DOMException: Failed to execute 'createAnswer' on 'RTCPeerConnection': PeerConnection cannot create an answer in a state other than have-remote-offer or have-local-pranswer.

i found this error when using createAnswer(), but when i using JS Socket.io server it's run well

Ref link : https://acidtango.com/thelemoncrunch/how-to-implement-a-video-conference-with-webrtc-and-node/

@aryabp aryabp added the bug Something isn't working label Jul 12, 2024
@Totodore
Copy link
Owner

Could you add a debug log of socketioxide please?
You have to enable the tracing feature in socketioxide and set the log level to debug.

@aryabp
Copy link
Author

aryabp commented Jul 12, 2024

there is no errors in socketioxide code, but the errors occurs in socket.io client code

here some snippets

main.rs

mod state;

use std::sync::{Arc, Mutex};

use axum::routing::get;
use serde::{Deserialize, Serialize};

use socketioxide::{
    extract::{Data, SocketRef, State},
    SocketIo,
};
use tokio::net::TcpListener;
use tower::ServiceBuilder;
use tower_http::cors::CorsLayer;
use tracing::info;
use tracing_subscriber::FmtSubscriber;

////////Real-time chat

#[derive(Debug, Deserialize)]
struct MessageIn {
    room: String,
    text: String,
}

#[derive(Serialize)]
struct Messages {
    messages: Vec<state::Message>,
}

////////////Signaling

type RoomCounterStore = Arc<Mutex<Vec<String>>>;
#[derive(Serialize, Deserialize, Clone, Debug)]
struct SDP {
    sdp: String,
    #[serde(rename = "type")]
    r#type: String,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
struct WebRTC {
    #[serde(rename = "type")]
    r#type: String,
    sdp: SDP,
    room_id: String,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct ICE {
    room_id: String,
    label: i32,
    candidate: String,
}

async fn on_connect(socket: SocketRef) {
    info!("socket connected: {}", socket.id);

    ///////////Real-time chat

    socket.on(
        "joinchat",
        |socket: SocketRef, Data::<String>(room), store: State<state::MessageStore>| async move {
            info!("Received joinchat: {:?}", room);
            let _ = socket.leave_all();
            let _ = socket.join(room.clone());
            let messages = store.get(&room).await;
            let _ = socket.emit("messages", Messages { messages });
        },
    );
    socket.on(
        "messagechat",
        |socket: SocketRef, Data::<MessageIn>(data), store: State<state::MessageStore>| async move {
            info!("Received messagechat: {:?}", data);

            let response = state::Message {
                text: data.text,
                user: format!("anon-{}", socket.id),
                date: chrono::Utc::now(),
            };

            store.insert(&data.room, response.clone()).await;

            let _ = socket.within(data.room).emit("messagechat", response);
        },
    );
}
async fn on_signal(socket: SocketRef) {
    info!("socket connected: {}", socket.id);

    ///////////Signaling

    socket.on(
        "join",
        |socket: SocketRef, Data::<String>(room_id), count: State<RoomCounterStore>| async move {
            match count.lock() {
                Ok(mut e) => {
                    let x: usize = e.iter().filter(|x| **x == room_id).count();
                    if x == 0 {
                        info!(
                            "Creating room {:?} and emitting room_created socket event",
                            room_id
                        );
                        e.push(room_id.clone());
                        let _ = socket.leave_all();
                        let _ = socket.join(room_id.clone());
                        let _ = socket.emit("room_created", room_id);
                    } else if x == 1 {
                        info!(
                            "Joining room : {:?} and emitting room_joined socket event",
                            room_id
                        );
                        e.push(room_id.clone());
                        let _ = socket.leave_all();
                        let _ = socket.join(room_id.clone());
                        let _ = socket.emit("room_joined", room_id);
                    } else {
                        info!(
                            "Can't join room {:?}, emitting full_room socket event",
                            room_id
                        );
                        let _ = socket.emit("full_room", room_id);
                    }
                }
                Err(_) => {}
            }
        },
    );
    socket.on(
        "start_call",
        |socket: SocketRef, Data::<String>(room_id)| async move {
            info!(
                "Broadcasting start_call event to peers in room {:?}",
                room_id
            );
            let _ = socket
                .broadcast()
                .to(room_id)
                .emit("start_call", "whatever");
        },
    );
    socket.on(
        "webrtc_offer",
        |socket: SocketRef, Data::<WebRTC>(webrtc_sdp)| async move {
            info!(
                "Broadcasting webrtc_offer event to peers in room {:?}",
                webrtc_sdp.room_id
            );
            let _ = socket
                .broadcast()
                .to(webrtc_sdp.room_id)
                .emit("webrtc_offer", webrtc_sdp.sdp);
        },
    );
    socket.on(
        "webrtc_answer",
        |socket: SocketRef, Data::<WebRTC>(webrtc_sdp)| async move {
            info!(
                "Broadcasting webrtc_answer event to peers in room {:?}",
                webrtc_sdp.room_id
            );
            let _ = socket
                .broadcast()
                .to(webrtc_sdp.room_id)
                .emit("webrtc_answer", webrtc_sdp.sdp);
        },
    );
    socket.on(
        "webrtc_ice_candidate",
        |socket: SocketRef, Data::<ICE>(ice)| async move {
            let room_id = ice.room_id.clone();
            info!(
                "Broadcasting webrtc_ice_candidate event to peers in room {:?}",
                room_id
            );
            let _ = socket.broadcast().to(room_id).emit("webrtc_offer", ice);
        },
    );
}

async fn handler(axum::extract::State(io): axum::extract::State<SocketIo>) {
    info!("handler called");
    let _ = io.emit("hello", "world");
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing::subscriber::set_global_default(FmtSubscriber::default())?;

    let messages = state::MessageStore::default();
    let roomcount = RoomCounterStore::default();
    let (layer, io) = SocketIo::builder()
        .with_state(messages)
        .with_state(roomcount)
        .build_layer();

    io.ns("/", on_connect);
    io.ns("/signal", on_signal);
    

    let app = axum::Router::new()
        .route("/", get(|| async { "Hello, World!" }))
        .route("/hello", get(handler))
        .with_state(io)
        .layer(
            ServiceBuilder::new()
                .layer(CorsLayer::permissive())
                .layer(layer),
        );

    info!("Starting server");

    let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();

    Ok(())
}

state.rs

use std::{collections::{HashMap, VecDeque}, sync::Arc};
use tokio::sync::RwLock;

#[derive(serde::Serialize, Clone, Debug)]
pub struct Message {
    pub text: String,
    pub user: String,
    pub date: chrono::DateTime<chrono::Utc>,
}

pub type RoomStore = HashMap<String, VecDeque<Message>>;

#[derive(Clone, Default)]
pub struct MessageStore {
    pub messages: Arc<RwLock<RoomStore>>,
}

impl MessageStore {
    pub async fn insert(&self, room: &str, message: Message) {
        let mut binding = self.messages.write().await;
        let messages = binding.entry(room.to_owned()).or_default();
        messages.push_front(message);
        messages.truncate(20);
    }

    pub async fn get(&self, room: &str) -> Vec<Message> {
        let messages = self.messages.read().await.get(room).cloned();
        messages.unwrap_or_default().into_iter().rev().collect()
    }
}

@aryabp
Copy link
Author

aryabp commented Jul 15, 2024

i am sorry if my contribution is like hit & run, but i am in rush with my paper and project, i will continue this thread with debugging em. cya

@Totodore Totodore linked a pull request Jul 25, 2024 that will close this issue
@Totodore
Copy link
Owner

@aryabp I have added an example in the repository with the correct implementation.
I suspect that you had some issue with room management which would lead to state issue and then bad webrtc calls.
The solution I implemented is only the one taken from the article and does not include the messaging part of your code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants