Skip to content

Commit

Permalink
opt: close file descriptor after OnClose() for UDP along with code im…
Browse files Browse the repository at this point in the history
…provement

Updates #621

Follows up #622
  • Loading branch information
panjf2000 committed Jul 3, 2024
1 parent 1ed4d08 commit 26e13bb
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 51 deletions.
80 changes: 43 additions & 37 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"io"
"net"
"os"
"strings"
"time"
Expand Down Expand Up @@ -226,28 +227,38 @@ loop:
return nil
}

func (el *eventloop) close(c *conn, err error) (rerr error) {
if addr := c.localAddr; addr != nil && strings.HasPrefix(c.localAddr.Network(), "udp") {
rerr = el.poller.Delete(c.fd)
func (el *eventloop) close(c *conn, err error) error {
if !c.opened || el.connections.getConn(c.fd) == nil {
return nil // ignore stale connections
}

action := el.eventHandler.OnClose(c, err)

var errStr strings.Builder

if _, ok := c.localAddr.(*net.UDPAddr); ok {
if err := el.poller.Delete(c.fd); err != nil {
err = fmt.Errorf("failed to delete fd=%d from poller in event-loop(%d): %v",
c.fd, el.idx, os.NewSyscallError("delete", err))
errStr.WriteString(err.Error())
errStr.WriteString(" | ")

Check warning on line 244 in eventloop_unix.go

View check run for this annotation

Codecov / codecov/patch

eventloop_unix.go#L241-L244

Added lines #L241 - L244 were not covered by tests
}
if _, ok := el.listeners[c.fd]; !ok {
rerr = unix.Close(c.fd)
if err := unix.Close(c.fd); err != nil {
err = fmt.Errorf("failed to close fd=%d in event-loop(%d): %v",
c.fd, el.idx, os.NewSyscallError("close", err))
errStr.WriteString(err.Error())

Check warning on line 250 in eventloop_unix.go

View check run for this annotation

Codecov / codecov/patch

eventloop_unix.go#L248-L250

Added lines #L248 - L250 were not covered by tests
}
el.connections.delConn(c)
}
if el.eventHandler.OnClose(c, err) == Shutdown {
return errorx.ErrEngineShutdown
}
c.release()
return
}

if !c.opened || el.connections.getConn(c.fd) == nil {
return // ignore stale connections
if errStr.Len() > 0 {
return errors.New(strings.TrimSuffix(errStr.String(), " | "))

Check warning on line 256 in eventloop_unix.go

View check run for this annotation

Codecov / codecov/patch

eventloop_unix.go#L256

Added line #L256 was not covered by tests
}
return el.handleAction(c, action)
}

el.connections.delConn(c)
if el.eventHandler.OnClose(c, err) == Shutdown {
rerr = errorx.ErrEngineShutdown
}

// Send residual data in buffer back to the remote before actually closing the connection.
for !c.outboundBuffer.IsEmpty() {
Expand All @@ -262,8 +273,9 @@ func (el *eventloop) close(c *conn, err error) (rerr error) {
}
}

c.release()

err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd)
var errStr strings.Builder
if err0 != nil {
err0 = fmt.Errorf("failed to delete fd=%d from poller in event-loop(%d): %v",
c.fd, el.idx, os.NewSyscallError("delete", err0))
Expand All @@ -276,16 +288,10 @@ func (el *eventloop) close(c *conn, err error) (rerr error) {
errStr.WriteString(err1.Error())
}
if errStr.Len() > 0 {
if rerr != nil {
el.getLogger().Errorf(strings.TrimSuffix(errStr.String(), " | "))
} else {
rerr = errors.New(strings.TrimSuffix(errStr.String(), " | "))
}
return errors.New(strings.TrimSuffix(errStr.String(), " | "))

Check warning on line 291 in eventloop_unix.go

View check run for this annotation

Codecov / codecov/patch

eventloop_unix.go#L291

Added line #L291 was not covered by tests
}

c.release()

return
return el.handleAction(c, action)
}

func (el *eventloop) wake(c *conn) error {
Expand Down Expand Up @@ -333,19 +339,6 @@ func (el *eventloop) ticker(ctx context.Context) {
}
}

func (el *eventloop) handleAction(c *conn, action Action) error {
switch action {
case None:
return nil
case Close:
return el.close(c, nil)
case Shutdown:
return errorx.ErrEngineShutdown
default:
return nil
}
}

func (el *eventloop) readUDP(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
n, sa, err := unix.Recvfrom(fd, el.buffer, 0)
if err != nil {
Expand All @@ -372,6 +365,19 @@ func (el *eventloop) readUDP(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error
return nil
}

func (el *eventloop) handleAction(c *conn, action Action) error {
switch action {
case None:
return nil
case Close:
return el.close(c, nil)
case Shutdown:
return errorx.ErrEngineShutdown
default:
return nil
}
}

/*
func (el *eventloop) execCmd(itf interface{}) (err error) {
cmd := itf.(*asyncCmd)
Expand Down
31 changes: 17 additions & 14 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"bytes"
"context"
"errors"
"net"
"runtime"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -188,28 +188,31 @@ func (el *eventloop) wake(c *conn) error {
}

func (el *eventloop) close(c *conn, err error) error {
if addr := c.localAddr; addr != nil && strings.HasPrefix(addr.Network(), "udp") {
action := el.eventHandler.OnClose(c, err)
if _, ok := el.connections[c]; c.localAddr == nil || !ok {
return nil // ignore stale wakes.
}

action := el.eventHandler.OnClose(c, err)
err = nil

if _, ok := c.localAddr.(*net.UDPAddr); ok {
if c.rawConn != nil {
if err := c.rawConn.Close(); err != nil {
el.getLogger().Errorf("failed to close connection(%s), error:%v", c.remoteAddr.String(), err)
}
err = c.rawConn.Close()
}
c.release()
if err != nil {
return err
}
return el.handleAction(c, action)
}

if _, ok := el.connections[c]; !ok {
return nil // ignore stale wakes.
}

delete(el.connections, c)
el.incConn(-1)
action := el.eventHandler.OnClose(c, err)
if err := c.rawConn.Close(); err != nil {
el.getLogger().Errorf("failed to close connection(%s), error:%v", c.remoteAddr.String(), err)
}
err = c.rawConn.Close()
c.release()
if err != nil {
return err
}

return el.handleAction(c, action)
}
Expand Down

0 comments on commit 26e13bb

Please sign in to comment.