Skip to content

Commit

Permalink
webrtc: fix incorrect use of stream IDs, simplify
Browse files Browse the repository at this point in the history
The first stream has ID 1, not 3.
  • Loading branch information
marten-seemann committed Jul 3, 2023
1 parent 4800379 commit d6690f2
Showing 1 changed file with 21 additions and 51 deletions.
72 changes: 21 additions & 51 deletions p2p/transport/webrtc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ type connection struct {
remoteKey ic.PubKey
remoteMultiaddr ma.Multiaddr

m sync.Mutex
streams map[uint16]*stream
m sync.Mutex
streams map[uint16]*stream
nextStreamID atomic.Int32

acceptQueue chan dataChannel
idAllocator *sidAllocator

ctx context.Context
cancel context.CancelFunc
Expand All @@ -77,10 +77,8 @@ func newConnection(
remoteKey ic.PubKey,
remoteMultiaddr ma.Multiaddr,
) (*connection, error) {
idAllocator := newSidAllocator(direction)

ctx, cancel := context.WithCancel(context.Background())
conn := &connection{
c := &connection{
pc: pc,
transport: transport,
scope: scope,
Expand All @@ -94,14 +92,20 @@ func newConnection(
ctx: ctx,
cancel: cancel,
streams: make(map[uint16]*stream),
idAllocator: idAllocator,

acceptQueue: make(chan dataChannel, maxAcceptQueueLen),
}
switch direction {
case network.DirInbound:
c.nextStreamID.Store(1)
case network.DirOutbound:
// stream ID 0 is used for the Noise handshake stream
c.nextStreamID.Store(2)
}

pc.OnConnectionStateChange(conn.onConnectionStateChange)
pc.OnConnectionStateChange(c.onConnectionStateChange)
pc.OnDataChannel(func(dc *webrtc.DataChannel) {
if conn.IsClosed() {
if c.IsClosed() {
return
}
dc.OnOpen(func() {
Expand All @@ -111,7 +115,7 @@ func newConnection(
return
}
select {
case conn.acceptQueue <- dataChannel{rwc, dc}:
case c.acceptQueue <- dataChannel{rwc, dc}:
default:
log.Warnf("connection busy, rejecting stream")
b, _ := proto.Marshal(&pb.Message{Flag: pb.Message_RESET.Enum()})
Expand All @@ -122,7 +126,7 @@ func newConnection(
})
})

return conn, nil
return c, nil
}

// ConnState implements transport.CapableConn
Expand Down Expand Up @@ -156,11 +160,12 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error
return nil, c.closeErr
}

streamID, err := c.idAllocator.nextID()
if err != nil {
return nil, err
id := c.nextStreamID.Add(2) - 2
if id > math.MaxUint16 {
return nil, errors.New("exhausted stream ID space")
}
dc, err := c.pc.CreateDataChannel("", &webrtc.DataChannelInit{ID: streamID})
streamID := uint16(id)
dc, err := c.pc.CreateDataChannel("", &webrtc.DataChannelInit{ID: &streamID})
if err != nil {
return nil, err
}
Expand All @@ -173,7 +178,7 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error
rwc,
nil,
nil,
func() { c.removeStream(*streamID) },
func() { c.removeStream(streamID) },
)
if err := c.addStream(str); err != nil {
str.Close()
Expand Down Expand Up @@ -305,38 +310,3 @@ func (c *connection) setRemotePeer(id peer.ID) {
func (c *connection) setRemotePublicKey(key ic.PubKey) {
c.remoteKey = key
}

// sidAllocator is a helper struct to allocate stream IDs for datachannels. ID
// reuse is not currently implemented. This prevents streams in pion from hanging
// with `invalid DCEP message` errors.
// The id is picked using the scheme described in:
// https://datatracker.ietf.org/doc/html/draft-ietf-rtcweb-data-channel-08#section-6.5
// By definition, the DTLS role for inbound connections is set to DTLS Server,
// and outbound connections are DTLS Client.
type sidAllocator struct {
n atomic.Uint32
}

func newSidAllocator(direction network.Direction) *sidAllocator {
switch direction {
case network.DirInbound:
// server will use odd values
a := new(sidAllocator)
a.n.Store(1)
return a
case network.DirOutbound:
// client will use even values
return new(sidAllocator)
default:
panic(fmt.Sprintf("create SID allocator for direction: %s", direction))
}
}

func (a *sidAllocator) nextID() (*uint16, error) {
nxt := a.n.Add(2)
if nxt > math.MaxUint16 {
return nil, fmt.Errorf("sid exhausted")
}
result := uint16(nxt)
return &result, nil
}

0 comments on commit d6690f2

Please sign in to comment.