Skip to content

Commit

Permalink
WebRTC transport implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ckousik committed Nov 18, 2022
1 parent 8e90ed8 commit c3b1aa4
Show file tree
Hide file tree
Showing 21 changed files with 3,282 additions and 13 deletions.
29 changes: 23 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0
github.com/google/gopacket v1.1.19
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-cid v0.3.2
Expand Down Expand Up @@ -47,6 +48,13 @@ require (
github.com/multiformats/go-multistream v0.3.3
github.com/multiformats/go-varint v0.0.6
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/pion/datachannel v1.5.2
github.com/pion/dtls/v2 v2.1.5
github.com/pion/ice/v2 v2.2.6
github.com/pion/logging v0.2.2
github.com/pion/stun v0.3.5
github.com/pion/transport v0.13.1
github.com/pion/webrtc/v3 v3.1.43
github.com/prometheus/client_golang v1.13.0
github.com/raulk/go-watchdog v1.3.0
github.com/stretchr/testify v1.8.0
Expand All @@ -61,29 +69,29 @@ require (
require (
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/containerd/cgroups v1.0.4 // indirect
github.com/coreos/go-systemd/v22 v22.4.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgraph-io/badger v1.6.2 // indirect
github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/dgraph-io/ristretto v0.1.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/huin/goupnp v1.0.3 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/cpuid/v2 v2.1.1 // indirect
github.com/koron/go-ssdp v0.0.3 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/marten-seemann/qpack v0.3.0 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.3 // indirect
Expand All @@ -94,17 +102,26 @@ require (
github.com/miekg/dns v1.1.50 // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/multiformats/go-base36 v0.1.0 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/ginkgo/v2 v2.2.0 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/pion/interceptor v0.1.12 // indirect
github.com/pion/mdns v0.0.5 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.10 // indirect
github.com/pion/rtp v1.7.13 // indirect
github.com/pion/sctp v1.8.2 // indirect
github.com/pion/sdp/v3 v3.0.6 // indirect
github.com/pion/srtp/v2 v2.0.10 // indirect
github.com/pion/turn/v2 v2.0.8 // indirect
github.com/pion/udp v0.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/dig v1.15.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
Expand Down
85 changes: 78 additions & 7 deletions go.sum

Large diffs are not rendered by default.

250 changes: 250 additions & 0 deletions p2p/transport/webrtc/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package libp2pwebrtc

import (
"context"
"os"
"sync"

ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
tpt "github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
"github.com/pion/webrtc/v3"
)

var _ tpt.CapableConn = &connection{}

type connection struct {
pc *webrtc.PeerConnection
transport *WebRTCTransport
scope network.ConnManagementScope

localPeer peer.ID
privKey ic.PrivKey
localMultiaddr ma.Multiaddr

remotePeer peer.ID
remoteKey ic.PubKey
remoteMultiaddr ma.Multiaddr

streams map[uint16]*dataChannel

accept chan network.MuxedStream

ctx context.Context
cancel context.CancelFunc
m sync.Mutex
}

func newConnection(
pc *webrtc.PeerConnection,
transport *WebRTCTransport,
scope network.ConnManagementScope,

localPeer peer.ID,
privKey ic.PrivKey,
localMultiaddr ma.Multiaddr,

remotePeer peer.ID,
remoteKey ic.PubKey,
remoteMultiaddr ma.Multiaddr,
) *connection {
accept := make(chan network.MuxedStream, 10)

ctx, cancel := context.WithCancel(context.Background())

conn := &connection{
pc: pc,
transport: transport,
scope: scope,

localPeer: localPeer,
privKey: privKey,
localMultiaddr: localMultiaddr,

remotePeer: remotePeer,
remoteKey: remoteKey,
remoteMultiaddr: remoteMultiaddr,
ctx: ctx,
cancel: cancel,
streams: make(map[uint16]*dataChannel),

accept: accept,
}

pc.OnDataChannel(func(dc *webrtc.DataChannel) {
log.Debugf("[%s] incoming datachannel: %s", localPeer, dc.Label())
id := *dc.ID()
var stream *dataChannel
dc.OnOpen(func() {
// datachannel cannot be detached before opening
rwc, err := dc.Detach()
if err != nil {
log.Errorf("[%s] could not detch channel: %s", localPeer, dc.Label())
return
}
stream = newDataChannel(dc, rwc, pc, nil, nil)
conn.addStream(id, stream)
accept <- stream
})

dc.OnClose(func() {
stream.remoteClosed()
conn.removeStream(id)
})
})

pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
if state == webrtc.PeerConnectionStateClosed || state == webrtc.PeerConnectionStateDisconnected {
conn.Close()
}
})

return conn
}

// ConnState implements transport.CapableConn
func (c *connection) ConnState() network.ConnectionState {
return network.ConnectionState{}
}

// Implement network.MuxedConn

func (c *connection) Close() error {
if c.IsClosed() {
return nil
}

c.scope.Done()
// cleanup routine
for _, stream := range c.streams {
_ = stream.Close()
}
c.cancel()
_ = c.pc.Close()
return nil
}

func (c *connection) IsClosed() bool {
select {
case <-c.ctx.Done():
return true
default:
}
return false
}

func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error) {
if c.IsClosed() {
return nil, os.ErrClosed
}

result := make(chan struct {
network.MuxedStream
error
})
dc, err := c.pc.CreateDataChannel("", nil)
if err != nil {
return nil, err
}

streamID := *dc.ID()
var stream *dataChannel
dc.OnOpen(func() {
rwc, err := dc.Detach()
if err != nil {
result <- struct {
network.MuxedStream
error
}{nil,
errDatachannel("could not detach", err),
}
return
}
stream = newDataChannel(dc, rwc, c.pc, nil, nil)
c.addStream(streamID, stream)
result <- struct {
network.MuxedStream
error
}{stream, err}
})

dc.OnClose(func() {
stream.remoteClosed()
c.removeStream(streamID)
})

select {
case <-ctx.Done():
_ = dc.Close()
return nil, ctx.Err()
case r := <-result:
return r.MuxedStream, r.error
}
}

func (c *connection) AcceptStream() (network.MuxedStream, error) {
select {
case <-c.ctx.Done():
return nil, os.ErrClosed
case stream := <-c.accept:
return stream, nil
}
}

// implement network.ConnSecurity
func (c *connection) LocalPeer() peer.ID {
return c.localPeer
}

// only used during setup
func (c *connection) setRemotePeer(id peer.ID) {
c.remotePeer = id
}

func (c *connection) LocalPrivateKey() ic.PrivKey {
return c.privKey
}

func (c *connection) RemotePeer() peer.ID {
return c.remotePeer
}

func (c *connection) RemotePublicKey() ic.PubKey {
return c.remoteKey
}

func (c *connection) setRemotePublicKey(key ic.PubKey) {
c.remoteKey = key
}

// implement network.ConnMultiaddrs
func (c *connection) LocalMultiaddr() ma.Multiaddr {
return c.localMultiaddr
}

func (c *connection) RemoteMultiaddr() ma.Multiaddr {
return c.remoteMultiaddr
}

// implement network.ConnScoper
func (c *connection) Scope() network.ConnScope {
return c.scope
}

func (c *connection) Transport() tpt.Transport {
return c.transport
}

func (c *connection) addStream(id uint16, stream *dataChannel) {
c.m.Lock()
defer c.m.Unlock()
c.streams[id] = stream
}

func (c *connection) removeStream(id uint16) {
c.m.Lock()
defer c.m.Unlock()
delete(c.streams, id)
}
Loading

0 comments on commit c3b1aa4

Please sign in to comment.