Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: mitigate the latency issue by prioritizing asynchronous writes #563

Merged
merged 3 commits into from
Mar 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"golang.org/x/sys/unix"

"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/queue"
"github.com/panjf2000/gnet/v2/internal/socket"
"github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
Expand Down Expand Up @@ -51,9 +52,9 @@

el := eng.eventLoops.next(remoteAddr)
c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr)
err = el.poller.UrgentTrigger(el.register, c)
err = el.poller.Trigger(queue.HighPriority, el.register, c)
if err != nil {
eng.opts.Logger.Errorf("UrgentTrigger() failed due to error: %v", err)
eng.opts.Logger.Errorf("failed to enqueue accepted socket of high-priority: %v", err)

Check warning on line 57 in acceptor_unix.go

View check run for this annotation

Codecov / codecov/patch

acceptor_unix.go#L57

Added line #L57 was not covered by tests
_ = unix.Close(nfd)
c.release()
}
Expand Down
5 changes: 3 additions & 2 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/panjf2000/gnet/v2/internal/math"
"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/queue"
"github.com/panjf2000/gnet/v2/internal/socket"
"github.com/panjf2000/gnet/v2/pkg/buffer/ring"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
Expand Down Expand Up @@ -126,7 +127,7 @@ func (cli *Client) Start() error {

// Stop stops the client event-loop.
func (cli *Client) Stop() (err error) {
logging.Error(cli.el.poller.UrgentTrigger(func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil))
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil))
// Stop the ticker.
if cli.opts.Ticker {
cli.el.engine.ticker.cancel()
Expand Down Expand Up @@ -233,7 +234,7 @@ func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) {
ccb := &connWithCallback{c: gc, cb: func() {
close(connOpened)
}}
err = cli.el.poller.UrgentTrigger(cli.el.register, ccb)
err = cli.el.poller.Trigger(queue.HighPriority, cli.el.register, ccb)
if err != nil {
gc.Close()
return nil, err
Expand Down
11 changes: 6 additions & 5 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/panjf2000/gnet/v2/internal/gfd"
gio "github.com/panjf2000/gnet/v2/internal/io"
"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/queue"
"github.com/panjf2000/gnet/v2/internal/socket"
"github.com/panjf2000/gnet/v2/pkg/buffer/elastic"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
Expand Down Expand Up @@ -442,18 +443,18 @@ func (c *conn) AsyncWrite(buf []byte, callback AsyncCallback) error {
}
return err
}
return c.loop.poller.Trigger(c.asyncWrite, &asyncWriteHook{callback, buf})
return c.loop.poller.Trigger(queue.HighPriority, c.asyncWrite, &asyncWriteHook{callback, buf})
}

func (c *conn) AsyncWritev(bs [][]byte, callback AsyncCallback) error {
if c.isDatagram {
return errorx.ErrUnsupportedOp
}
return c.loop.poller.Trigger(c.asyncWritev, &asyncWritevHook{callback, bs})
return c.loop.poller.Trigger(queue.HighPriority, c.asyncWritev, &asyncWritevHook{callback, bs})
}

func (c *conn) Wake(callback AsyncCallback) error {
return c.loop.poller.UrgentTrigger(func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
err = c.loop.wake(c)
if callback != nil {
_ = callback(c, err)
Expand All @@ -463,7 +464,7 @@ func (c *conn) Wake(callback AsyncCallback) error {
}

func (c *conn) CloseWithCallback(callback AsyncCallback) error {
return c.loop.poller.Trigger(func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
err = c.loop.close(c, nil)
if callback != nil {
_ = callback(c, err)
Expand All @@ -473,7 +474,7 @@ func (c *conn) CloseWithCallback(callback AsyncCallback) error {
}

func (c *conn) Close() error {
return c.loop.poller.Trigger(func(_ interface{}) (err error) {
return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) {
err = c.loop.close(c, nil)
return
}, nil)
Expand Down
13 changes: 7 additions & 6 deletions engine_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

"github.com/panjf2000/gnet/v2/internal/gfd"
"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/queue"
"github.com/panjf2000/gnet/v2/pkg/errors"
)

Expand Down Expand Up @@ -202,17 +203,17 @@
eng.eventHandler.OnShutdown(s)

// Notify all event-loops to exit.
eng.eventLoops.iterate(func(_ int, el *eventloop) bool {
err := el.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
eng.eventLoops.iterate(func(i int, el *eventloop) bool {
err := el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to call UrgentTrigger on sub event-loop when stopping engine: %v", err)
eng.opts.Logger.Errorf("failed to enqueue shutdown signal of high-priority for event-loop(%d): %v", i, err)

Check warning on line 209 in engine_unix.go

View check run for this annotation

Codecov / codecov/patch

engine_unix.go#L209

Added line #L209 was not covered by tests
}
return true
})
if eng.acceptor != nil {
err := eng.acceptor.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
err := eng.acceptor.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to call UrgentTrigger on main event-loop when stopping engine: %v", err)
eng.opts.Logger.Errorf("failed to enqueue shutdown signal of high-priority for main event-loop: %v", err)

Check warning on line 216 in engine_unix.go

View check run for this annotation

Codecov / codecov/patch

engine_unix.go#L216

Added line #L216 was not covered by tests
}
}

Expand Down Expand Up @@ -299,7 +300,7 @@
return errors.ErrInvalidConn
}
if urgent {
return el.poller.UrgentTrigger(el.execCmd, cmd)
return el.poller.Trigger(queue.LowPriority, el.execCmd, cmd)
}
return el.poller.Trigger(el.execCmd, cmd)
}
Expand Down
7 changes: 5 additions & 2 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/panjf2000/gnet/v2/internal/io"
"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/queue"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
)
Expand Down Expand Up @@ -253,8 +254,10 @@ func (el *eventloop) ticker(ctx context.Context) {
switch action {
case None:
case Shutdown:
err := el.poller.UrgentTrigger(func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)
el.getLogger().Debugf("stopping ticker in event-loop(%d) from OnTick(), UrgentTrigger:%v", el.idx, err)
// It seems reasonable to mark this as low-priority, waiting for some tasks like asynchronous writes
// to finish up before shutting down the service.
err := el.poller.Trigger(queue.LowPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)
el.getLogger().Debugf("failed to enqueue shutdown signal of high-priority for event-loop(%d): %v", el.idx, err)
}
if timer == nil {
timer = time.NewTimer(delay)
Expand Down
47 changes: 20 additions & 27 deletions internal/netpoll/epoll_default_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ import (

// Poller represents a poller which is in charge of monitoring file-descriptors.
type Poller struct {
fd int // epoll fd
efd int // eventfd
efdBuf []byte // efd buffer to read an 8-byte integer
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
fd int // epoll fd
efd int // eventfd
efdBuf []byte // efd buffer to read an 8-byte integer
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
highPriorityEventsThreshold int32 // threshold of high-priority events
}

// OpenPoller instantiates a poller.
Expand All @@ -63,6 +64,7 @@ func OpenPoller() (poller *Poller, err error) {
}
poller.asyncTaskQueue = queue.NewLockFreeQueue()
poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue()
poller.highPriorityEventsThreshold = MaxPollEventsCap
return
}

Expand All @@ -81,31 +83,22 @@ var (
b = (*(*[8]byte)(unsafe.Pointer(&u)))[:]
)

// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events,
// then the poller will get tasks from urgentAsyncTaskQueue and run them.
// Trigger enqueues task and wakes up the poller to process pending tasks.
// By default, any incoming task will enqueued into urgentAsyncTaskQueue
// before the threshold of high-priority events is reached. When it happens,
// any asks other than high-priority tasks will be shunted to asyncTaskQueue.
//
// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small,
// so only those urgent tasks should be put into this queue.
func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) {
// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
p.urgentAsyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
if _, err = unix.Write(p.efd, b); err == unix.EAGAIN {
err = nil
}
if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold {
p.asyncTaskQueue.Enqueue(task)
} else {
// There might be some low-priority tasks overflowing into urgentAsyncTaskQueue in a flash,
// but that's tolerable because it ought to be a rare case.
p.urgentAsyncTaskQueue.Enqueue(task)
}
return os.NewSyscallError("write", err)
}

// Trigger is like UrgentTrigger but it puts task into asyncTaskQueue,
// call this method when the task is not so urgent, for instance writing data back to the peer.
//
// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
p.asyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
if _, err = unix.Write(p.efd, b); err == unix.EAGAIN {
err = nil
Expand Down
47 changes: 20 additions & 27 deletions internal/netpoll/epoll_optimized_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ import (

// Poller represents a poller which is in charge of monitoring file-descriptors.
type Poller struct {
fd int // epoll fd
epa *PollAttachment // PollAttachment for waking events
efdBuf []byte // efd buffer to read an 8-byte integer
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
fd int // epoll fd
epa *PollAttachment // PollAttachment for waking events
efdBuf []byte // efd buffer to read an 8-byte integer
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
highPriorityEventsThreshold int32 // threshold of high-priority events
}

// OpenPoller instantiates a poller.
Expand All @@ -64,6 +65,7 @@ func OpenPoller() (poller *Poller, err error) {
}
poller.asyncTaskQueue = queue.NewLockFreeQueue()
poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue()
poller.highPriorityEventsThreshold = MaxPollEventsCap
return
}

Expand All @@ -82,31 +84,22 @@ var (
b = (*(*[8]byte)(unsafe.Pointer(&u)))[:]
)

// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events,
// then the poller will get tasks from urgentAsyncTaskQueue and run them.
// Trigger enqueues task and wakes up the poller to process pending tasks.
// By default, any incoming task will enqueued into urgentAsyncTaskQueue
// before the threshold of high-priority events is reached. When it happens,
// any asks other than high-priority tasks will be shunted to asyncTaskQueue.
//
// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small,
// so only those urgent tasks should be put into this queue.
func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) {
// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
p.urgentAsyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
if _, err = unix.Write(p.epa.FD, b); err == unix.EAGAIN {
err = nil
}
if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold {
p.asyncTaskQueue.Enqueue(task)
} else {
// There might be some low-priority tasks overflowing into urgentAsyncTaskQueue in a flash,
// but that's tolerable because it ought to be a rare case.
p.urgentAsyncTaskQueue.Enqueue(task)
}
return os.NewSyscallError("write", err)
}

// Trigger is like UrgentTrigger but it puts task into asyncTaskQueue,
// call this method when the task is not so urgent, for instance writing data back to the peer.
//
// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
p.asyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
if _, err = unix.Write(p.epa.FD, b); err == unix.EAGAIN {
err = nil
Expand Down
43 changes: 18 additions & 25 deletions internal/netpoll/kqueue_default_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (

// Poller represents a poller which is in charge of monitoring file-descriptors.
type Poller struct {
fd int
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
fd int
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
highPriorityEventsThreshold int32 // threshold of high-priority events
}

// OpenPoller instantiates a poller.
Expand All @@ -58,6 +59,7 @@ func OpenPoller() (poller *Poller, err error) {
}
poller.asyncTaskQueue = queue.NewLockFreeQueue()
poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue()
poller.highPriorityEventsThreshold = MaxPollEventsCap
return
}

Expand All @@ -72,31 +74,22 @@ var note = []unix.Kevent_t{{
Fflags: unix.NOTE_TRIGGER,
}}

// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events,
// then the poller will get tasks from urgentAsyncTaskQueue and run them.
// Trigger enqueues task and wakes up the poller to process pending tasks.
// By default, any incoming task will enqueued into urgentAsyncTaskQueue
// before the threshold of high-priority events is reached. When it happens,
// any asks other than high-priority tasks will be shunted to asyncTaskQueue.
//
// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small,
// so only those urgent tasks should be put into this queue.
func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) {
// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
p.urgentAsyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
if _, err = unix.Kevent(p.fd, note, nil, nil); err == unix.EAGAIN {
err = nil
}
if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold {
p.asyncTaskQueue.Enqueue(task)
} else {
// There might be some low-priority tasks overflowing into urgentAsyncTaskQueue in a flash,
// but that's tolerable because it ought to be a rare case.
p.urgentAsyncTaskQueue.Enqueue(task)
}
return os.NewSyscallError("kevent trigger", err)
}

// Trigger is like UrgentTrigger but it puts task into asyncTaskQueue,
// call this method when the task is not so urgent, for instance writing data back to the peer.
//
// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
p.asyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
if _, err = unix.Kevent(p.fd, note, nil, nil); err == unix.EAGAIN {
err = nil
Expand Down
Loading
Loading