Skip to content

Commit

Permalink
Updates
Browse files Browse the repository at this point in the history
- add debug messages to UI
- delay webrtc ICE gathering until websocket ready
- show/hide password field based on PUBLISHER_PASSWORD
- fix channel count tracking
- fix bug: on publisher reconnect, update channel track
- update README
  • Loading branch information
porjo committed May 2, 2024
1 parent 7b5b9be commit a4d5a0e
Show file tree
Hide file tree
Showing 13 changed files with 153 additions and 94 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ A translator would act as a publisher and people wanting to hear the translation

## Building

Download [precompiled binary for Linux](https://github.com/porjo/babelcast/releases/latest) or build it yourself.
Download [precompiled binary](https://github.com/porjo/babelcast/releases/latest) or build it yourself.

## Usage

```
Usage of ./babelcast:
-debug
enable debug log
-port int
listen on this port (default 8080)
listen on this port (default 8080)
-webRoot string
web root directory (default "html")
web root directory (default "html")
```

Then point your web browser to `http://localhost:8080/`
Expand Down
39 changes: 11 additions & 28 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ type Conn struct {

channelName string

errChan chan error
infoChan chan string
trackQuitChan chan struct{}
errChan chan error
infoChan chan string

logger *slog.Logger

Expand All @@ -59,7 +58,6 @@ func NewConn(ws *websocket.Conn) *Conn {
c := &Conn{}
c.errChan = make(chan error)
c.infoChan = make(chan string)
c.trackQuitChan = make(chan struct{})
c.logger = slog.With("remote_addr", ws.RemoteAddr())
// wrap Gorilla conn with our conn so we can extend functionality
c.wsConn = ws
Expand All @@ -69,6 +67,8 @@ func NewConn(ws *websocket.Conn) *Conn {

func (c *Conn) setupSessionPublisher(offer webrtc.SessionDescription) error {

c.logger = c.logger.With("client_type", "publisher")

answer, err := c.peer.SetupPublisher(offer, c.rtcStateChangeHandler, c.rtcTrackHandlerPublisher, c.onIceCandidate)
if err != nil {
return err
Expand All @@ -88,6 +88,8 @@ func (c *Conn) setupSessionPublisher(offer webrtc.SessionDescription) error {

func (c *Conn) setupSessionSubscriber() error {

c.logger = c.logger.With("client_type", "subscriber")

channel := reg.GetChannel(c.channelName)
if channel == nil {
return fmt.Errorf("channel '%s' not found", c.channelName)
Expand Down Expand Up @@ -143,33 +145,12 @@ func (c *Conn) connectPublisher(cmd CmdConnect) error {
return nil
}

func (c *Conn) connectSubscriber(cmd CmdConnect) error {

if c.peer.pc == nil {
return fmt.Errorf("webrtc session not established")
}

if cmd.Channel == "" {
return fmt.Errorf("channel cannot be empty")
}
if channelRegexp.MatchString(cmd.Channel) {
return fmt.Errorf("channel name must contain only alphanumeric characters")
}

c.logger.Info("setting up subscriber for channel", "channel", c.channelName)

return nil
}

func (c *Conn) Close() {
c.Lock()
defer c.Unlock()
if c.hasClosed {
return
}
if c.trackQuitChan != nil {
close(c.trackQuitChan)
}
if c.peer.pc != nil {
c.peer.pc.Close()
}
Expand Down Expand Up @@ -224,7 +205,9 @@ func (c *Conn) rtcTrackHandlerPublisher(remoteTrack *webrtc.TrackRemote, receive
for {
i, _, readErr := remoteTrack.Read(rtpBuf)
if readErr != nil {
c.logger.Error("remoteTrack.Read error", "err", readErr)
if !errors.Is(readErr, io.EOF) {
c.logger.Error("remoteTrack.Read error", "err", readErr)
}
return
}

Expand Down Expand Up @@ -286,7 +269,7 @@ func (c *Conn) onIceCandidate(candidate *webrtc.ICECandidate) {
}

func (c *Conn) LogHandler(ctx context.Context) {
defer c.logger.Info("log goroutine quitting...")
defer c.logger.Info("log goroutine quit")
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -318,7 +301,7 @@ func (c *Conn) LogHandler(ctx context.Context) {
}

func (c *Conn) PingHandler(ctx context.Context) {
defer c.logger.Info("ws ping goroutine quitting...")
defer c.logger.Info("ws ping goroutine quit")
pingCh := time.Tick(PingInterval)
for {
select {
Expand Down
24 changes: 24 additions & 0 deletions html/css/common.css
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ input, textarea {
display: none;
}

label {
font-size: 0.8em;
margin: 5px 0;
display: inline-block;
}

#error {
border: 2px solid #8c4d4d;
background-color: #fff1f1;
Expand All @@ -78,6 +84,24 @@ input, textarea {
text-align: left;
}

#messages {
border: 2px solid #fff;
text-align: left;
padding: 10px;
}

#messages summary {
cursor: pointer;
}

#message-log {
font-family: monospace;
overflow: auto;
padding: 10px;
white-space: pre-wrap;
height: 100px;
}

.title {
font-family: 'Pacifico', cursive;
margin: 10px 0;
Expand Down
29 changes: 15 additions & 14 deletions html/js/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ ws_uri += path + "/ws";

var ws = new WebSocket(ws_uri);

// array of funcs to call when WS is ready
var onWSReady = [];

var error, msg;

var debug = (...m) => {
console.log(...m)
msg(m.join(' '))
}

error = (...m) => {
Expand All @@ -38,36 +42,33 @@ error = (...m) => {
errorEle.classList.remove('hidden');
}
msg = m => {
let d = new Date(Date.now()).toLocaleString();
// strip html
let div = document.createElement("div");
div.innerHtml = m.Message;
let a = div.innerText;
let msgEle = document.getElementById('messages');
msgEle.classList.add('message');
let insEle = document.createElement("div");
insEle.innerHTML = "<span class='time'>" + d + "</span><span class='sender'>" + m.Sender + "</span><span class='message'>" + a + "</span>";
msgEle.insertBefore(insEle, msgEle.firstChild);
let d = new Date(Date.now()).toISOString();
let msgEle = document.getElementById('message-log');
msgEle.prepend(d + ' ' + m + '\n');
}

var wsSend = m => {
if (ws.readyState === 1) {
ws.send(JSON.stringify(m));
} else {
debug("WS send not ready, delaying...");
debug("ws: send not ready, delaying...", m);
setTimeout(function() {
ws.send(JSON.stringify(m));
}, 2000);
}
}

ws.onopen = function() {
debug("WS connection open");
debug("ws: connection open");
onWSReady.forEach(f => {
f()
})
};

//
// -------- WebRTC ------------
//

var pc = new RTCPeerConnection({

iceServers: [
Expand All @@ -86,21 +87,21 @@ pc.oniceconnectionstatechange = e => {
case "disconnected":
case "closed":
case "completed":
break;
case "connected":
document.getElementById('spinner').classList.add('hidden');
let cb = document.getElementById('connect-button');
if(cb) { cb.classList.remove('hidden') };
break;
default:
debug("ice state unknown", e);
debug("webrtc: ice state unknown", e);
break;
}
}

var startSession = sd => {
document.getElementById('spinner').classList.remove('hidden');
try {
debug("webrtc: set remote description")
pc.setRemoteDescription(new RTCSessionDescription({type: 'answer', sdp: sd}));
} catch (e) {
alert(e);
Expand Down
36 changes: 21 additions & 15 deletions html/js/publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ws.onmessage = function (e) {
if( 'Key' in wsMsg ) {
switch (wsMsg.Key) {
case 'info':
debug("server info", wsMsg.Value);
debug("server info: " + wsMsg.Value);
break;
case 'error':
error("server error", wsMsg.Value);
Expand All @@ -48,12 +48,15 @@ ws.onmessage = function (e) {
case 'ice_candidate':
pc.addIceCandidate(wsMsg.Value)
break;
case 'password_required':
document.getElementById('password-form').classList.remove('hidden');
break;
}
}
};

ws.onclose = function() {
debug("WS connection closed");
debug("ws: connection closed");
if (audioTrack) {
audioTrack.stop()
}
Expand All @@ -80,7 +83,10 @@ const signalMeter = document.querySelector('#microphone-meter meter');

navigator.mediaDevices.getUserMedia(constraints).then(stream => {
audioTrack = stream.getAudioTracks()[0];
stream.getTracks().forEach(track => pc.addTrack(track, stream))
stream.getTracks().forEach(track => {
debug("webrtc: add track")
pc.addTrack(track, stream)
})
// mute until we're ready
audioTrack.enabled = false;

Expand All @@ -101,17 +107,17 @@ navigator.mediaDevices.getUserMedia(constraints).then(stream => {
}, 50);
});

pc.createOffer().then(d => {
pc.setLocalDescription(d);
let val = {Key: 'session_publisher', Value: d};
wsSend(val);
}).catch(debug)
}).catch(debug)
let f = () => {
debug("webrtc: create offer")
pc.createOffer().then(d => {
debug("webrtc: set local description")
pc.setLocalDescription(d);
let val = { Key: 'session_publisher', Value: d };
wsSend(val);
}).catch(debug)
}
// create offer if WS is ready, otherwise queue
ws.readyState == WebSocket.OPEN ? f() : onWSReady.push(f)

}).catch(debug)

pc.onicecandidate = e => {
if (e.candidate && e.candidate.candidate !== "") {
let val = {Key: 'ice_candidate', Value: e.candidate};
wsSend(val);
}
}
1 change: 0 additions & 1 deletion html/js/soundmeter.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ function SoundMeter(context) {
}

SoundMeter.prototype.connectToSource = function(stream, callback) {
console.log('SoundMeter connecting');
try {
this.mic = this.context.createMediaStreamSource(stream);
this.mic.connect(this.script);
Expand Down
29 changes: 17 additions & 12 deletions html/js/subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ function updateChannels(channels) {
channelsEle.appendChild(c);
});
}

};

ws.onmessage = function (e) {
let wsMsg = JSON.parse(e.data);
if( 'Key' in wsMsg ) {
switch (wsMsg.Key) {
case 'info':
debug("server info:", wsMsg.Value);
debug("server info: " + wsMsg.Value);
break;
case 'error':
error("server error:", wsMsg.Value);
Expand All @@ -53,11 +52,10 @@ ws.onmessage = function (e) {
case 'channels':
updateChannels(wsMsg.Value);
break;
case "session_established": // wait for the message that session_subscriber was received
case "session_received": // wait for the message that session_subscriber was received
document.getElementById("channels").classList.remove("hidden");
document.getElementById('reload').classList.remove('hidden');
document.getElementById("spinner").classList.add("hidden");
console.log("session_established");
break;
case 'ice_candidate':
pc.addIceCandidate(wsMsg.Value)
Expand All @@ -67,17 +65,18 @@ ws.onmessage = function (e) {
};

ws.onclose = function() {
debug("WS connection closed");
debug("ws: connection closed")
pc.close()
document.getElementById('media').classList.add('hidden');
document.getElementById('media').classList.add('hidden')
clearInterval(getChannelsId);
};

//
// -------- WebRTC ------------
//

pc.ontrack = function (event) {
debug("Ontrack", event);
debug("webrtc: ontrack");
let el = document.createElement(event.track.kind);
el.srcObject = event.streams[0];
el.autoplay = true;
Expand All @@ -88,10 +87,16 @@ pc.ontrack = function (event) {

pc.addTransceiver('audio')

pc.createOffer().then(d => {
pc.setLocalDescription(d)
let val = {Key: 'session_subscriber', Value: d};
wsSend(val);
}).catch(debug);
let f = () => {
debug("webrtc: create offer")
pc.createOffer().then(d => {
debug("webrtc: set local description")
pc.setLocalDescription(d);
let val = { Key: 'session_subscriber', Value: d };
wsSend(val);
}).catch(debug)
}
// create offer if WS is ready, otherwise queue
ws.readyState == WebSocket.OPEN ? f() : onWSReady.push(f)

// ----------------------------------------------------------------
Loading

0 comments on commit a4d5a0e

Please sign in to comment.