Skip to content

Commit

Permalink
Merge pull request #17 from porjo/fixes
Browse files Browse the repository at this point in the history
Fixes
  • Loading branch information
porjo authored May 5, 2024
2 parents 8fde5a9 + 39a0d7a commit b017bfb
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 244 deletions.
111 changes: 17 additions & 94 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ limitations under the License.
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -39,36 +38,28 @@ var channelRegexp = regexp.MustCompile("[^a-zA-Z0-9 ]+")

type Conn struct {
sync.Mutex

peer *WebRTCPeer

wsConn *websocket.Conn

peer *WebRTCPeer
wsConn *websocket.Conn
channelName string
infoChan chan string
logger *slog.Logger
hasClosed bool

errChan chan error
infoChan chan string

logger *slog.Logger

hasClosed bool
clientID string
isPublisher bool
}

func NewConn(ws *websocket.Conn) *Conn {
c := &Conn{}
c.errChan = make(chan error)
c.infoChan = make(chan string)
c.logger = slog.With("remote_addr", ws.RemoteAddr())
// wrap Gorilla conn with our conn so we can extend functionality
c.wsConn = ws

return c
}

func (c *Conn) setupSessionPublisher(offer webrtc.SessionDescription) error {

c.logger = c.logger.With("client_type", "publisher")

answer, err := c.peer.SetupPublisher(offer, c.rtcStateChangeHandler, c.rtcTrackHandlerPublisher, c.onIceCandidate)
if err != nil {
return err
Expand All @@ -88,11 +79,9 @@ func (c *Conn) setupSessionPublisher(offer webrtc.SessionDescription) error {

func (c *Conn) setupSessionSubscriber() error {

c.logger = c.logger.With("client_type", "subscriber")

channel := reg.GetChannel(c.channelName)
if channel == nil {
return fmt.Errorf("channel '%s' not found", c.channelName)
return fmt.Errorf("channel %q not found", c.channelName)
}

answer, err := c.peer.SetupSubscriber(channel, c.rtcStateChangeHandler, c.onIceCandidate)
Expand Down Expand Up @@ -130,9 +119,7 @@ func (c *Conn) connectPublisher(cmd CmdConnect) error {
return fmt.Errorf("incorrect password")
}

c.Lock()
c.channelName = cmd.Channel
c.Unlock()
c.logger.Info("setting up publisher for channel", "channel", c.channelName)

localTrack := <-c.peer.localTrackChan
Expand All @@ -146,11 +133,17 @@ func (c *Conn) connectPublisher(cmd CmdConnect) error {
}

func (c *Conn) Close() {
c.logger.Debug("close called")
c.Lock()
defer c.Unlock()
if c.hasClosed {
return
}
if c.isPublisher {
reg.RemovePublisher(c.channelName)
} else {
reg.RemoveSubscriber(c.channelName, c.clientID)
}
if c.peer.pc != nil {
c.peer.pc.Close()
}
Expand All @@ -161,13 +154,13 @@ func (c *Conn) Close() {
}

func (c *Conn) writeMsg(val interface{}) error {
c.Lock()
defer c.Unlock()
j, err := json.Marshal(val)
if err != nil {
return err
}
c.logger.Debug("write message", "msg", string(j))
c.Lock()
defer c.Unlock()
if err = c.wsConn.WriteMessage(websocket.TextMessage, j); err != nil {
return err
}
Expand Down Expand Up @@ -224,9 +217,6 @@ func (c *Conn) rtcTrackHandlerPublisher(remoteTrack *webrtc.TrackRemote, receive

// WebRTC callback function
func (c *Conn) rtcStateChangeHandler(connectionState webrtc.ICEConnectionState) {

//var err error

switch connectionState {
case webrtc.ICEConnectionStateConnected:
c.logger.Info("ice connected")
Expand All @@ -236,13 +226,7 @@ func (c *Conn) rtcStateChangeHandler(connectionState webrtc.ICEConnectionState)

case webrtc.ICEConnectionStateDisconnected:
c.logger.Info("ice disconnected")
c.Close()

// non blocking channel write, as receiving goroutine may already have quit
select {
case c.infoChan <- "ice disconnected":
default:
}
c.infoChan <- "ice disconnected"
}
}

Expand All @@ -267,64 +251,3 @@ func (c *Conn) onIceCandidate(candidate *webrtc.ICECandidate) {
return
}
}

func (c *Conn) LogHandler(ctx context.Context) {
defer c.logger.Info("log goroutine quit")
for {
select {
case <-ctx.Done():
return
case err := <-c.errChan:
j, err := json.Marshal(err.Error())
if err != nil {
c.logger.Error("marshal error", "err", err.Error())
}
m := wsMsg{Key: "error", Value: j}
err = c.writeMsg(m)
if err != nil {
c.logger.Error("writemsg error", "err", err.Error())
}
// end the WS session on error
c.Close()
case info := <-c.infoChan:
j, err := json.Marshal(info)
if err != nil {
c.logger.Error("marshal error", "err", err.Error())
}
m := wsMsg{Key: "info", Value: j}
err = c.writeMsg(m)
if err != nil {
c.logger.Error("writemsg error", "err", err.Error())
}
}
}
}

func (c *Conn) PingHandler(ctx context.Context) {
defer c.logger.Info("ws ping goroutine quit")
pingCh := time.Tick(PingInterval)
for {
select {
case <-ctx.Done():
return
case <-pingCh:
c.Lock()
// WriteControl can be called concurrently
err := c.wsConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WriteWait))
if err != nil {
c.Unlock()
c.logger.Error("ping client error", "err", err.Error())
return
}
c.Unlock()
}
}
}

/*
func hash(s string) uint32 {
h := fnv.New32a()
h.Write([]byte(s))
return h.Sum32()
}
*/
3 changes: 3 additions & 0 deletions html/js/subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ ws.onmessage = function (e) {
case 'ice_candidate':
pc.addIceCandidate(wsMsg.Value)
break;
case 'channel_closed':
error("channel '" + wsMsg.Value + "' closed by server")
break;
}
}
};
Expand Down
Loading

0 comments on commit b017bfb

Please sign in to comment.