Skip to content

Commit

Permalink
opt: reduce GC pause frequency for Conn.AsyncWrite on Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Jul 9, 2021
1 parent 939da1b commit 477bb4f
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 21 deletions.
45 changes: 33 additions & 12 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package gnet

import (
"net"
"sync"

"github.com/panjf2000/gnet/pool/bytebuffer"
prb "github.com/panjf2000/gnet/pool/ringbuffer"
Expand All @@ -34,8 +35,14 @@ type stderr struct {
err error
}

type wakeReq struct {
c *stdConn
type signalTask struct {
run func(*stdConn) error
c *stdConn
}

type dataTask struct {
run func([]byte) (int, error)
buf []byte
}

type tcpConn struct {
Expand All @@ -47,6 +54,11 @@ type udpConn struct {
c *stdConn
}

var (
signalTaskPool = sync.Pool{New: func() interface{} { return new(signalTask) }}
dataTaskPool = sync.Pool{New: func() interface{} { return new(dataTask) }}
)

type stdConn struct {
ctx interface{} // user-defined context
conn net.Conn // original connection
Expand Down Expand Up @@ -134,6 +146,13 @@ func (c *stdConn) read() ([]byte, error) {
return c.codec.Decode(c)
}

func (c *stdConn) write(data []byte) (n int, err error) {
if c.conn != nil {
n, err = c.conn.Write(data)
}
return
}

// ================================= Public APIs of gnet.Conn =================================

func (c *stdConn) Read() []byte {
Expand Down Expand Up @@ -212,12 +231,10 @@ func (c *stdConn) BufferLength() int {
func (c *stdConn) AsyncWrite(buf []byte) (err error) {
var encodedBuf []byte
if encodedBuf, err = c.codec.Encode(c, buf); err == nil {
c.loop.ch <- func() (err error) {
if c.conn != nil {
_, err = c.conn.Write(encodedBuf)
}
return
}
task := dataTaskPool.Get().(*dataTask)
task.run = c.write
task.buf = encodedBuf
c.loop.ch <- task
}
return
}
Expand All @@ -228,14 +245,18 @@ func (c *stdConn) SendTo(buf []byte) (err error) {
}

func (c *stdConn) Wake() error {
c.loop.ch <- wakeReq{c}
task := signalTaskPool.Get().(*signalTask)
task.run = c.loop.loopWake
task.c = c
c.loop.ch <- task
return nil
}

func (c *stdConn) Close() error {
c.loop.ch <- func() error {
return c.loop.loopCloseConn(c)
}
task := signalTaskPool.Get().(*signalTask)
task.run = c.loop.loopCloseConn
task.c = c
c.loop.ch <- task
return nil
}

Expand Down
16 changes: 9 additions & 7 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func (el *eventloop) loopRun(lockOSThread bool) {
el.svr.loopWG.Done()
}()

for v := range el.ch {
switch v := v.(type) {
for i := range el.ch {
switch v := i.(type) {
case error:
err = v
case *stdConn:
Expand All @@ -87,10 +87,12 @@ func (el *eventloop) loopRun(lockOSThread bool) {
err = el.loopReadUDP(v.c)
case *stderr:
err = el.loopError(v.c, v.err)
case wakeReq:
err = el.loopWake(v.c)
case func() error:
err = v()
case *signalTask:
err = v.run(v.c)
signalTaskPool.Put(i)
case *dataTask:
_, err = v.run(v.buf)
dataTaskPool.Put(i)
}

if err == errors.ErrServerShutdown {
Expand Down Expand Up @@ -183,7 +185,7 @@ func (el *eventloop) loopTicker(ctx context.Context) {
for {
delay, action = el.eventHandler.Tick()
if action == Shutdown {
el.ch <- func() error { return errors.ErrServerShutdown }
el.ch <- errors.ErrServerShutdown
// logging.Debugf("stopping ticker in event-loop(%d) from Tick()", el.idx)
}
if timer == nil {
Expand Down
4 changes: 2 additions & 2 deletions server_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"sync"
"sync/atomic"

errors2 "github.com/panjf2000/gnet/errors"
gerrors "github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal/logging"
)

Expand Down Expand Up @@ -124,7 +124,7 @@ func (svr *server) stop(s Server) {

// Notify all loops to close.
svr.lb.iterate(func(i int, el *eventloop) bool {
el.ch <- errors2.ErrServerShutdown
el.ch <- gerrors.ErrServerShutdown
return true
})

Expand Down

0 comments on commit 477bb4f

Please sign in to comment.