Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

patch: v2.5.2 #600

Merged
merged 1 commit into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion connection_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *conn) processIO(_ int, filter netpoll.IOEvent, flags netpoll.IOFlags) (
// 1) EVFILT_WRITE|EV_ADD|EV_CLEAR|EV_EOF, 2) EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF.
err = el.write(c)
default:
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
c.outboundBuffer.Release() // don't bother to write to a connection that is already broken
err = el.close(c, io.EOF)
}
}
Expand Down
8 changes: 4 additions & 4 deletions connection_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
el := c.loop
// First check for any unexpected non-IO events.
// For these events we just close the connection directly.
if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 {
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
if ev&(netpoll.ErrEvents|unix.EPOLLRDHUP) != 0 && ev&netpoll.ReadWriteEvents == 0 {
c.outboundBuffer.Release() // don't bother to write to a connection that is already broken
return el.close(c, io.EOF)
}
// Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority
Expand All @@ -43,14 +43,14 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
// to the remote first and then close the connection.
//
// We perform eventloop.write for EPOLLOUT because it can take good care of either case.
if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 {
if ev&(netpoll.WriteEvents|netpoll.ErrEvents) != 0 {
if err := el.write(c); err != nil {
return err
}
}
// Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in
// the socket buffer.
if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 {
if ev&(netpoll.ReadEvents|netpoll.ErrEvents) != 0 {
if err := el.read(c); err != nil {
return err
}
Expand Down
41 changes: 36 additions & 5 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,18 @@ func (el *eventloop) open(c *conn) error {
return el.handleAction(c, action)
}

func (el *eventloop) read0(itf interface{}) error {
return el.read(itf.(*conn))
}

const maxBytesTransferET = 1 << 22

func (el *eventloop) read(c *conn) error {
if !c.opened {
return nil
}

var recv int
isET := el.engine.opts.EdgeTriggeredIO
loop:
n, err := unix.Read(c.fd, el.buffer)
Expand All @@ -131,6 +138,7 @@ loop:
}
return el.close(c, os.NewSyscallError("read", err))
}
recv += n

c.buffer = el.buffer[:n]
action := el.eventHandler.OnTraffic(c)
Expand All @@ -144,13 +152,25 @@ loop:
_, _ = c.inboundBuffer.Write(c.buffer)
c.buffer = c.buffer[:0]

if isET || c.isEOF {
if c.isEOF || (isET && recv < maxBytesTransferET) {
goto loop
}

// To prevent infinite reading in ET mode and starving other events,
// we need to set up threshold for the maximum read bytes per connection
// on each event-loop. If the threshold is reached and there are still
// unread data in the socket buffer, we must issue another read event manually.
if isET && n == len(el.buffer) {
return el.poller.Trigger(queue.LowPriority, el.read0, c)
}

return nil
}

func (el *eventloop) write0(itf interface{}) error {
return el.write(itf.(*conn))
}

// The default value of UIO_MAXIOV/IOV_MAX is 1024 on Linux and most BSD-like OSs.
const iovMax = 1024

Expand All @@ -161,8 +181,9 @@ func (el *eventloop) write(c *conn) error {

isET := el.engine.opts.EdgeTriggeredIO
var (
n int
err error
n int
sent int
err error
)
loop:
iov, _ := c.outboundBuffer.Peek(-1)
Expand All @@ -182,14 +203,24 @@ loop:
default:
return el.close(c, os.NewSyscallError("write", err))
}
if isET && !c.outboundBuffer.IsEmpty() {
sent += n

if isET && !c.outboundBuffer.IsEmpty() && sent < maxBytesTransferET {
goto loop
}

// All data have been sent, it's no need to monitor the writable events for LT mode,
// remove the writable event from poller to help the future event-loops if necessary.
if !isET && c.outboundBuffer.IsEmpty() {
_ = el.poller.ModRead(&c.pollAttachment, false)
return el.poller.ModRead(&c.pollAttachment, false)
}

// To prevent infinite writing in ET mode and starving other events,
// we need to set up threshold for the maximum write bytes per connection
// on each event-loop. If the threshold is reached and there are still
// pending data to write, we must issue another write event manually.
if isET && !c.outboundBuffer.IsEmpty() {
return el.poller.Trigger(queue.HighPriority, el.write0, c)
}

return nil
Expand Down
30 changes: 15 additions & 15 deletions eventloop_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
func BenchmarkGC4El100k(b *testing.B) {
oldGc := debug.SetGCPercent(-1)

ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 100000)
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 100000)
b.Run("Run-4-eventloop-100000", func(b *testing.B) {
for i := 0; i < b.N; i++ {
runtime.GC()
Expand All @@ -62,7 +62,7 @@ func BenchmarkGC4El100k(b *testing.B) {
func BenchmarkGC4El200k(b *testing.B) {
oldGc := debug.SetGCPercent(-1)

ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 200000)
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 200000)
b.Run("Run-4-eventloop-200000", func(b *testing.B) {
for i := 0; i < b.N; i++ {
runtime.GC()
Expand All @@ -76,7 +76,7 @@ func BenchmarkGC4El200k(b *testing.B) {
func BenchmarkGC4El500k(b *testing.B) {
oldGc := debug.SetGCPercent(-1)

ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 500000)
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 500000)
b.Run("Run-4-eventloop-500000", func(b *testing.B) {
for i := 0; i < b.N; i++ {
runtime.GC()
Expand Down Expand Up @@ -146,73 +146,73 @@ func TestServeGC(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 1, 10000)
testServeGC(t, "tcp", ":0", true, true, 1, 10000)
})
t.Run("1-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 1, 100000)
testServeGC(t, "tcp", ":0", true, true, 1, 100000)
})
t.Run("1-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 1, 1000000)
testServeGC(t, "tcp", ":0", true, true, 1, 1000000)
})
t.Run("2-loop-10000", func(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 2, 10000)
testServeGC(t, "tcp", ":0", true, true, 2, 10000)
})
t.Run("2-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 2, 100000)
testServeGC(t, "tcp", ":0", true, true, 2, 100000)
})
t.Run("2-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 2, 1000000)
testServeGC(t, "tcp", ":0", true, true, 2, 1000000)
})
t.Run("4-loop-10000", func(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 4, 10000)
testServeGC(t, "tcp", ":0", true, true, 4, 10000)
})
t.Run("4-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 4, 100000)
testServeGC(t, "tcp", ":0", true, true, 4, 100000)
})
t.Run("4-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 4, 1000000)
testServeGC(t, "tcp", ":0", true, true, 4, 1000000)
})
t.Run("16-loop-10000", func(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 16, 10000)
testServeGC(t, "tcp", ":0", true, true, 16, 10000)
})
t.Run("16-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 16, 100000)
testServeGC(t, "tcp", ":0", true, true, 16, 100000)
})
t.Run("16-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 16, 1000000)
testServeGC(t, "tcp", ":0", true, true, 16, 1000000)
})
})
}
Expand Down
11 changes: 8 additions & 3 deletions internal/netpoll/defs_poller_epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ const (
MinPollEventsCap = 32
// MaxAsyncTasksAtOneTime is the maximum amount of asynchronous tasks that the event-loop will process at one time.
MaxAsyncTasksAtOneTime = 256
// ErrEvents represents exceptional events that are not read/write, like socket being closed,
// reading/writing from/to a closed socket, etc.
ErrEvents = unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP
// ReadEvents represents readable events that are polled by epoll.
ReadEvents = unix.EPOLLIN | unix.EPOLLPRI
// WriteEvents represents writeable events that are polled by epoll.
WriteEvents = unix.EPOLLOUT
// ReadWriteEvents represents both readable and writeable events.
ReadWriteEvents = ReadEvents | WriteEvents
// ErrEvents represents exceptional events that occurred on the local side.
ErrEvents = unix.EPOLLERR | unix.EPOLLHUP
)

type eventList struct {
Expand Down
26 changes: 10 additions & 16 deletions internal/netpoll/poller_epoll_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,57 +193,51 @@ func (p *Poller) Polling(callback PollEventHandler) error {
}
}

const (
readEvents = unix.EPOLLIN | unix.EPOLLPRI | unix.EPOLLRDHUP
writeEvents = unix.EPOLLOUT | unix.EPOLLRDHUP
readWriteEvents = readEvents | writeEvents
)

// AddReadWrite registers the given file-descriptor with readable and writable events to the poller.
func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readWriteEvents
var ev uint32 = ReadWriteEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl add",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// AddRead registers the given file-descriptor with readable event to the poller.
func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readEvents
var ev uint32 = ReadEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl add",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// AddWrite registers the given file-descriptor with writable event to the poller.
func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = writeEvents
var ev uint32 = WriteEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl add",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// ModRead renews the given file-descriptor with readable event in the poller.
func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readEvents
var ev uint32 = ReadEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl mod",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// ModReadWrite renews the given file-descriptor with readable and writable events in the poller.
func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readWriteEvents
var ev uint32 = ReadWriteEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl mod",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
Expand Down
26 changes: 10 additions & 16 deletions internal/netpoll/poller_epoll_ultimate.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,12 @@ func (p *Poller) Polling() error {
}
}

const (
readEvents = unix.EPOLLIN | unix.EPOLLPRI | unix.EPOLLRDHUP
writeEvents = unix.EPOLLOUT | unix.EPOLLRDHUP
readWriteEvents = readEvents | writeEvents
)

// AddReadWrite registers the given file-descriptor with readable and writable events to the poller.
func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = readWriteEvents
ev.events = ReadWriteEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev))
Expand All @@ -215,9 +209,9 @@ func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error {
// AddRead registers the given file-descriptor with readable event to the poller.
func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = readEvents
ev.events = ReadEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev))
Expand All @@ -226,9 +220,9 @@ func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error {
// AddWrite registers the given file-descriptor with writable event to the poller.
func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = writeEvents
ev.events = WriteEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev))
Expand All @@ -237,9 +231,9 @@ func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error {
// ModRead renews the given file-descriptor with readable event in the poller.
func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = readEvents
ev.events = ReadEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl mod", epollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &ev))
Expand All @@ -248,9 +242,9 @@ func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error {
// ModReadWrite renews the given file-descriptor with readable and writable events in the poller.
func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = readWriteEvents
ev.events = ReadWriteEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl mod", epollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &ev))
Expand Down
Loading