Skip to content

Commit

Permalink
webrtc: close mux when closing listener
Browse files Browse the repository at this point in the history
There is currently a leak in the webrtc listener. When the listener
is closed the udp mux readloop just keeps running.
  • Loading branch information
sukunrt committed Feb 24, 2024
1 parent 473a5e9 commit 796429f
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 33 deletions.
47 changes: 28 additions & 19 deletions p2p/transport/webrtc/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type listener struct {
// used to control the lifecycle of the listener
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

var _ tpt.Listener = &listener{}
Expand Down Expand Up @@ -91,30 +92,27 @@ func newListener(transport *WebRTCTransport, laddr ma.Multiaddr, socket net.Pack
}

l.ctx, l.cancel = context.WithCancel(context.Background())
mux := udpmux.NewUDPMux(socket)
l.mux = mux
mux.Start()
l.mux = udpmux.NewUDPMux(socket)
l.mux.Start()

go l.listen()
l.wg.Add(1)
go func() {
defer l.wg.Done()
l.listen()
}()

return l, err
}

func (l *listener) listen() {
// Accepting a connection requires instantiating a peerconnection
// and a noise connection which is expensive. We therefore limit
// the number of in-flight connection requests. A connection
// is considered to be in flight from the instant it is handled
// until it is dequeued by a call to Accept, or errors out in some
// way.
inFlightQueueCh := make(chan struct{}, l.transport.maxInFlightConnections)
for i := uint32(0); i < l.transport.maxInFlightConnections; i++ {
inFlightQueueCh <- struct{}{}
}

// Accepting a connection requires instantiating a peerconnection and a noise connection
// which is expensive. We therefore limit the number of in-flight connection requests. A
// connection is considered to be in flight from the instant it is handled until it is
// dequeued by a call to Accept, or errors out in some way.
inFlightSemaphore := make(chan struct{}, l.transport.maxInFlightConnections)
for {
select {
case <-inFlightQueueCh:
case inFlightSemaphore <- struct{}{}:
case <-l.ctx.Done():
return
}
Expand All @@ -128,7 +126,7 @@ func (l *listener) listen() {
}

go func() {
defer func() { inFlightQueueCh <- struct{}{} }() // free this spot once again
defer func() { <-inFlightSemaphore }()

ctx, cancel := context.WithTimeout(l.ctx, candidateSetupTimeout)
defer cancel()
Expand All @@ -145,7 +143,7 @@ func (l *listener) listen() {
log.Warn("could not push connection: ctx done")
conn.Close()
case l.acceptQueue <- conn:
// acceptQueue is an unbuffered channel, so this block until the connection is accepted.
// acceptQueue is an unbuffered channel, so this blocks until the connection is accepted.
}
}()
}
Expand Down Expand Up @@ -307,7 +305,18 @@ func (l *listener) Close() error {
select {
case <-l.ctx.Done():
default:
l.cancel()
}
l.cancel()
l.mux.Close()
l.wg.Wait()
loop:
for {
select {
case conn := <-l.acceptQueue:
conn.Close()
default:
break loop
}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/transport/webrtc/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func TestStreamCloseAfterFINACK(t *testing.T) {
select {
case <-done:
t.Fatalf("Close should not have completed without processing FIN_ACK")
case <-time.After(2 * time.Second):
case <-time.After(200 * time.Millisecond):
}

b := make([]byte, 1)
Expand Down
Loading

0 comments on commit 796429f

Please sign in to comment.