Skip to content

Commit

Permalink
Merge pull request #502 from panjf2000/dev
Browse files Browse the repository at this point in the history
release: v2.3.2
  • Loading branch information
panjf2000 authored Sep 10, 2023
2 parents 332716b + 7699eae commit 2472c3f
Show file tree
Hide file tree
Showing 16 changed files with 98 additions and 62 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
strategy:
fail-fast: false
matrix:
go: ['1.17', '1.18', '1.19', '1.20']
go: ['1.17', '1.21']
os:
- ubuntu-latest
- macos-latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_gc_opt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
strategy:
fail-fast: false
matrix:
go: ['1.17', '1.18', '1.19', '1.20']
go: ['1.17', '1.21']
os:
- ubuntu-latest
- macos-latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_poll_opt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
strategy:
fail-fast: false
matrix:
go: ['1.17', '1.18', '1.19', '1.20']
go: ['1.17', '1.21']
os: [ubuntu-latest, macos-latest]
name: Go ${{ matrix.go }} @ ${{ matrix.os }}
runs-on: ${{ matrix.os }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_poll_opt_gc_opt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
strategy:
fail-fast: false
matrix:
go: ['1.17', '1.18', '1.19', '1.20']
go: ['1.17', '1.21']
os: [ubuntu-latest, macos-latest]
name: Go ${{ matrix.go }} @ ${{ matrix.os }}
runs-on: ${{ matrix.os }}
Expand Down
22 changes: 20 additions & 2 deletions acceptor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
package gnet

import (
"errors"
"net"
"runtime"
"sync/atomic"

errorx "github.com/panjf2000/gnet/v2/pkg/errors"
)

func (eng *engine) listen() (err error) {
Expand All @@ -34,7 +38,15 @@ func (eng *engine) listen() (err error) {
n, addr, e := eng.ln.pc.ReadFrom(buffer[:])
if e != nil {
err = e
eng.opts.Logger.Errorf("failed to receive data from UDP fd due to error:%v", err)
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
eng.opts.Logger.Errorf("failed to receive data from UDP fd due to error:%v", err)
} else if errors.Is(err, net.ErrClosed) {
err = errorx.ErrEngineShutdown
// TODO: errors.Join() is not supported until Go 1.20,
// we will uncomment this line after we bump up the
// minimal supported go version to 1.20.
// err = errors.Join(err, errorx.ErrEngineShutdown)
}
return
}

Expand All @@ -46,7 +58,13 @@ func (eng *engine) listen() (err error) {
tc, e := eng.ln.ln.Accept()
if e != nil {
err = e
eng.opts.Logger.Errorf("Accept() fails due to error: %v", err)
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
eng.opts.Logger.Errorf("Accept() fails due to error: %v", err)
} else if errors.Is(err, net.ErrClosed) {
err = errorx.ErrEngineShutdown
// TODO: ditto.
// err = errors.Join(err, errorx.ErrEngineShutdown)
}
return
}
el := eng.eventLoops.next(tc.RemoteAddr())
Expand Down
10 changes: 5 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

gerr "github.com/panjf2000/gnet/v2/pkg/errors"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
bbPool "github.com/panjf2000/gnet/v2/pkg/pool/bytebuffer"
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
Expand All @@ -30,8 +30,8 @@ type clientEvents struct {

func (ev *clientEvents) OnBoot(e Engine) Action {
fd, err := e.Dup()
require.ErrorIsf(ev.tester, err, gerr.ErrEmptyEngine, "expected error: %v, but got: %v",
gerr.ErrUnsupportedOp, err)
require.ErrorIsf(ev.tester, err, errorx.ErrEmptyEngine, "expected error: %v, but got: %v",
errorx.ErrUnsupportedOp, err)
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
return None
}
Expand Down Expand Up @@ -80,8 +80,8 @@ func (ev *clientEvents) OnTick() (delay time.Duration, action Action) {

func (ev *clientEvents) OnShutdown(e Engine) {
fd, err := e.Dup()
require.ErrorIsf(ev.tester, err, gerr.ErrEmptyEngine, "expected error: %v, but got: %v",
gerr.ErrUnsupportedOp, err)
require.ErrorIsf(ev.tester, err, errorx.ErrEmptyEngine, "expected error: %v, but got: %v",
errorx.ErrUnsupportedOp, err)
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
}

Expand Down
6 changes: 3 additions & 3 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/socket"
"github.com/panjf2000/gnet/v2/pkg/buffer/ring"
gerrors "github.com/panjf2000/gnet/v2/pkg/errors"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
)

Expand Down Expand Up @@ -126,7 +126,7 @@ func (cli *Client) Start() error {

// Stop stops the client event-loop.
func (cli *Client) Stop() (err error) {
logging.Error(cli.el.poller.UrgentTrigger(func(_ interface{}) error { return gerrors.ErrEngineShutdown }, nil))
logging.Error(cli.el.poller.UrgentTrigger(func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil))
// Stop the ticker.
if cli.opts.Ticker {
cli.el.engine.ticker.cancel()
Expand Down Expand Up @@ -215,7 +215,7 @@ func (cli *Client) Enroll(c net.Conn) (Conn, error) {
}
gc = newUDPConn(dupFD, cli.el, c.LocalAddr(), sockAddr, true)
default:
return nil, gerrors.ErrUnsupportedProtocol
return nil, errorx.ErrUnsupportedProtocol
}
err = cli.el.poller.UrgentTrigger(cli.el.register, gc)
if err != nil {
Expand Down
27 changes: 15 additions & 12 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/socket"
"github.com/panjf2000/gnet/v2/pkg/buffer/elastic"
gerrors "github.com/panjf2000/gnet/v2/pkg/errors"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
bsPool "github.com/panjf2000/gnet/v2/pkg/pool/byteslice"
)
Expand Down Expand Up @@ -355,7 +355,7 @@ func (c *conn) Write(p []byte) (int, error) {

func (c *conn) Writev(bs [][]byte) (int, error) {
if c.isDatagram {
return 0, gerrors.ErrUnsupportedOp
return 0, errorx.ErrUnsupportedOp
}
return c.writev(bs)
}
Expand Down Expand Up @@ -422,19 +422,22 @@ func (c *conn) SetKeepAlivePeriod(d time.Duration) error {

func (c *conn) AsyncWrite(buf []byte, callback AsyncCallback) error {
if c.isDatagram {
defer func() {
if callback != nil {
_ = callback(nil, nil)
}
}()
return c.sendTo(buf)
err := c.sendTo(buf)
// TODO: it will not go asynchronously with UDP, so calling a callback is needless,
// we may remove this branch in the future, please don't rely on the callback
// to do something important under UDP, if you're working with UDP, just call Conn.Write
// to send back your data.
if callback != nil {
_ = callback(nil, nil)
}
return err
}
return c.loop.poller.Trigger(c.asyncWrite, &asyncWriteHook{callback, buf})
}

func (c *conn) AsyncWritev(bs [][]byte, callback AsyncCallback) error {
if c.isDatagram {
return gerrors.ErrUnsupportedOp
return errorx.ErrUnsupportedOp
}
return c.loop.poller.Trigger(c.asyncWritev, &asyncWritevHook{callback, bs})
}
Expand Down Expand Up @@ -467,13 +470,13 @@ func (c *conn) Close() error {
}

func (*conn) SetDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
return errorx.ErrUnsupportedOp
}

func (*conn) SetReadDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
return errorx.ErrUnsupportedOp
}

func (*conn) SetWriteDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
return errorx.ErrUnsupportedOp
}
12 changes: 7 additions & 5 deletions engine_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package gnet

import (
"context"
"errors"
"runtime"
"sync"
"sync/atomic"
Expand All @@ -33,8 +34,9 @@ type engine struct {
ctx context.Context
cancel context.CancelFunc
}
inShutdown int32 // whether the engine is in shutdown
workerPool struct {
inShutdown int32 // whether the engine is in shutdown
beingShutdown int32 // whether the engine is being shutdown
workerPool struct {
*errgroup.Group

shutdownCtx context.Context
Expand All @@ -50,10 +52,11 @@ func (eng *engine) isInShutdown() bool {

// shutdown signals the engine to shut down.
func (eng *engine) shutdown(err error) {
if err != nil && err != errorx.ErrEngineShutdown {
if err != nil && !errors.Is(err, errorx.ErrEngineShutdown) {
eng.opts.Logger.Errorf("engine is being shutdown with error: %v", err)
}
eng.workerPool.shutdown()
atomic.StoreInt32(&eng.beingShutdown, 1)
}

func (eng *engine) closeEventLoops() {
Expand Down Expand Up @@ -91,7 +94,6 @@ func (eng *engine) start(numEventLoop int) error {
func (eng *engine) stop(engine Engine) error {
<-eng.workerPool.shutdownCtx.Done()

eng.opts.Logger.Infof("engine is being shutdown...")
eng.eventHandler.OnShutdown(engine)

if eng.ticker.cancel != nil {
Expand All @@ -100,7 +102,7 @@ func (eng *engine) stop(engine Engine) error {

eng.closeEventLoops()

if err := eng.workerPool.Wait(); err != nil {
if err := eng.workerPool.Wait(); err != nil && !errors.Is(err, errorx.ErrEngineShutdown) {
eng.opts.Logger.Errorf("engine shutdown error: %v", err)
}

Expand Down
18 changes: 9 additions & 9 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

"github.com/panjf2000/gnet/v2/internal/io"
"github.com/panjf2000/gnet/v2/internal/netpoll"
gerrors "github.com/panjf2000/gnet/v2/pkg/errors"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
)

Expand Down Expand Up @@ -115,7 +115,7 @@ func (el *eventloop) read(c *conn) error {
case Close:
return el.close(c, nil)
case Shutdown:
return gerrors.ErrEngineShutdown
return errorx.ErrEngineShutdown
}
_, _ = c.inboundBuffer.Write(c.buffer)
c.buffer = c.buffer[:0]
Expand Down Expand Up @@ -165,7 +165,7 @@ func (el *eventloop) close(c *conn, err error) (rerr error) {
el.connections.delConn(c)
}
if el.eventHandler.OnClose(c, err) == Shutdown {
return gerrors.ErrEngineShutdown
return errorx.ErrEngineShutdown
}
c.release()
return
Expand Down Expand Up @@ -206,7 +206,7 @@ func (el *eventloop) close(c *conn, err error) (rerr error) {

el.connections.delConn(c)
if el.eventHandler.OnClose(c, err) == Shutdown {
rerr = gerrors.ErrEngineShutdown
rerr = errorx.ErrEngineShutdown
}
c.release()

Expand Down Expand Up @@ -242,7 +242,7 @@ func (el *eventloop) ticker(ctx context.Context) {
switch action {
case None:
case Shutdown:
err := el.poller.UrgentTrigger(func(_ interface{}) error { return gerrors.ErrEngineShutdown }, nil)
err := el.poller.UrgentTrigger(func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)
el.getLogger().Debugf("stopping ticker in event-loop(%d) from OnTick(), UrgentTrigger:%v", el.idx, err)
}
if timer == nil {
Expand All @@ -266,7 +266,7 @@ func (el *eventloop) handleAction(c *conn, action Action) error {
case Close:
return el.close(c, nil)
case Shutdown:
return gerrors.ErrEngineShutdown
return errorx.ErrEngineShutdown
default:
return nil
}
Expand All @@ -293,7 +293,7 @@ func (el *eventloop) readUDP(fd int, _ netpoll.IOEvent) error {
c.release()
}
if action == Shutdown {
return gerrors.ErrEngineShutdown
return errorx.ErrEngineShutdown
}
return nil
}
Expand All @@ -303,7 +303,7 @@ func (el *eventloop) execCmd(itf interface{}) (err error) {
cmd := itf.(*asyncCmd)
c := el.connections.getConnByGFD(cmd.fd)
if c == nil || c.gfd != cmd.fd {
return gerrors.ErrInvalidConn
return errorx.ErrInvalidConn
}
defer func() {
Expand All @@ -322,7 +322,7 @@ func (el *eventloop) execCmd(itf interface{}) (err error) {
case asyncCmdWritev:
_, err = c.Writev(cmd.arg.([][]byte))
default:
return gerrors.ErrUnsupportedOp
return errorx.ErrUnsupportedOp
}
return
}
Expand Down
6 changes: 6 additions & 0 deletions gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ type Writer interface {
// AsyncWrite writes bytes to peer asynchronously, it's goroutine-safe,
// you don't have to invoke it within any method in EventHandler,
// usually you would call it in an individual goroutine.
//
// Note that it will go synchronously with UDP, so it is needless to call
// this asynchronous method, we may disable this method for UDP and just
// return ErrUnsupportedOp in the future, therefore, please don't rely on
// this method to do something important under UDP, if you're working with UDP,
// just call Conn.Write to send back your data.
AsyncWrite(buf []byte, callback AsyncCallback) (err error)

// AsyncWritev writes multiple byte slices to peer asynchronously,
Expand Down
Loading

0 comments on commit 2472c3f

Please sign in to comment.