Skip to content

Commit

Permalink
fix: remove deadlock in multiPacketListener (#211)
Browse files Browse the repository at this point in the history
* fix: remove deadlock in `multiPacketListener`.

* fix test error

* Move reading from the `PacketConn` outside the `select`.

* Revert "fix test error"

This reverts commit 552a1e6.

* Do not set `m.pc` to nil.

* Move buffer down.

* Undo vpc creation.

* Add mutex back to protect against concurrent `Close()` calls.

* Move return when connection is closed down.

* Do not close the `readCh` or exit on a closed PacketConn.
  • Loading branch information
sbruens authored Sep 18, 2024
1 parent b561f49 commit 8ee3b79
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions service/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,42 +128,40 @@ type readRequest struct {

type virtualPacketConn struct {
net.PacketConn
mu sync.Mutex // Mutex to protect access to the channels
readCh chan readRequest
readCh chan readRequest

mu sync.Mutex // Mutex to protect against race conditions when closing the connection.
closeCh chan struct{}
onCloseFunc OnCloseFunc
}

func (pc *virtualPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
func (pc *virtualPacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
respCh := make(chan struct {
n int
addr net.Addr
err error
}, 1)

pc.mu.Lock()
if pc.readCh == nil {
pc.mu.Unlock()
return 0, nil, net.ErrClosed
}
pc.readCh <- readRequest{
select {
case pc.readCh <- readRequest{
buffer: p,
respCh: respCh,
}:
case <-pc.closeCh:
return 0, nil, net.ErrClosed
}
pc.mu.Unlock()

resp := <-respCh
return resp.n, resp.addr, resp.err
}

// Close closes the virtualPacketConn. It must be called once, and only once,
// per virtualPacketConn.
func (pc *virtualPacketConn) Close() error {
pc.mu.Lock()
defer pc.mu.Unlock()

if pc.readCh == nil {
return nil
}
pc.readCh = nil

close(pc.closeCh)
if pc.onCloseFunc != nil {
onCloseFunc := pc.onCloseFunc
pc.onCloseFunc = nil
Expand Down Expand Up @@ -287,10 +285,13 @@ func (m *multiPacketListener) Acquire() (net.PacketConn, error) {
m.readCh = make(chan readRequest)
m.doneCh = make(chan struct{})
go func() {
buffer := make([]byte, serverUDPBufferSize)
for {
n, addr, err := m.pc.ReadFrom(buffer)
buffer = buffer[:n]
select {
case req := <-m.readCh:
n, addr, err := pc.ReadFrom(req.buffer)
n := copy(req.buffer, buffer)
req.respCh <- struct {
n int
addr net.Addr
Expand All @@ -307,14 +308,14 @@ func (m *multiPacketListener) Acquire() (net.PacketConn, error) {
return &virtualPacketConn{
PacketConn: m.pc,
readCh: m.readCh,
closeCh: make(chan struct{}),
onCloseFunc: func() error {
m.mu.Lock()
defer m.mu.Unlock()
m.count--
if m.count == 0 {
close(m.doneCh)
m.pc.Close()
m.pc = nil
if m.onCloseFunc != nil {
onCloseFunc := m.onCloseFunc
m.onCloseFunc = nil
Expand Down

0 comments on commit 8ee3b79

Please sign in to comment.