Skip to content

Commit

Permalink
Merge branch 'panjf2000:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
0-haha authored Sep 11, 2023
2 parents 9b98998 + 7699eae commit d25b6ab
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 43 deletions.
22 changes: 20 additions & 2 deletions acceptor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
package gnet

import (
"errors"
"net"
"runtime"
"sync/atomic"

errorx "github.com/panjf2000/gnet/v2/pkg/errors"
)

func (eng *engine) listen() (err error) {
Expand All @@ -34,7 +38,15 @@ func (eng *engine) listen() (err error) {
n, addr, e := eng.ln.pc.ReadFrom(buffer[:])
if e != nil {
err = e
eng.opts.Logger.Errorf("failed to receive data from UDP fd due to error:%v", err)
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
eng.opts.Logger.Errorf("failed to receive data from UDP fd due to error:%v", err)
} else if errors.Is(err, net.ErrClosed) {
err = errorx.ErrEngineShutdown
// TODO: errors.Join() is not supported until Go 1.20,
// we will uncomment this line after we bump up the
// minimal supported go version to 1.20.
// err = errors.Join(err, errorx.ErrEngineShutdown)
}
return
}

Expand All @@ -46,7 +58,13 @@ func (eng *engine) listen() (err error) {
tc, e := eng.ln.ln.Accept()
if e != nil {
err = e
eng.opts.Logger.Errorf("Accept() fails due to error: %v", err)
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
eng.opts.Logger.Errorf("Accept() fails due to error: %v", err)
} else if errors.Is(err, net.ErrClosed) {
err = errorx.ErrEngineShutdown
// TODO: ditto.
// err = errors.Join(err, errorx.ErrEngineShutdown)
}
return
}
el := eng.eventLoops.next(tc.RemoteAddr())
Expand Down
10 changes: 5 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

gerr "github.com/panjf2000/gnet/v2/pkg/errors"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
bbPool "github.com/panjf2000/gnet/v2/pkg/pool/bytebuffer"
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
Expand All @@ -30,8 +30,8 @@ type clientEvents struct {

func (ev *clientEvents) OnBoot(e Engine) Action {
fd, err := e.Dup()
require.ErrorIsf(ev.tester, err, gerr.ErrEmptyEngine, "expected error: %v, but got: %v",
gerr.ErrUnsupportedOp, err)
require.ErrorIsf(ev.tester, err, errorx.ErrEmptyEngine, "expected error: %v, but got: %v",
errorx.ErrUnsupportedOp, err)
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
return None
}
Expand Down Expand Up @@ -80,8 +80,8 @@ func (ev *clientEvents) OnTick() (delay time.Duration, action Action) {

func (ev *clientEvents) OnShutdown(e Engine) {
fd, err := e.Dup()
require.ErrorIsf(ev.tester, err, gerr.ErrEmptyEngine, "expected error: %v, but got: %v",
gerr.ErrUnsupportedOp, err)
require.ErrorIsf(ev.tester, err, errorx.ErrEmptyEngine, "expected error: %v, but got: %v",
errorx.ErrUnsupportedOp, err)
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
}

Expand Down
6 changes: 3 additions & 3 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/socket"
"github.com/panjf2000/gnet/v2/pkg/buffer/ring"
gerrors "github.com/panjf2000/gnet/v2/pkg/errors"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
)

Expand Down Expand Up @@ -126,7 +126,7 @@ func (cli *Client) Start() error {

// Stop stops the client event-loop.
func (cli *Client) Stop() (err error) {
logging.Error(cli.el.poller.UrgentTrigger(func(_ interface{}) error { return gerrors.ErrEngineShutdown }, nil))
logging.Error(cli.el.poller.UrgentTrigger(func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil))
// Stop the ticker.
if cli.opts.Ticker {
cli.el.engine.ticker.cancel()
Expand Down Expand Up @@ -215,7 +215,7 @@ func (cli *Client) Enroll(c net.Conn) (Conn, error) {
}
gc = newUDPConn(dupFD, cli.el, c.LocalAddr(), sockAddr, true)
default:
return nil, gerrors.ErrUnsupportedProtocol
return nil, errorx.ErrUnsupportedProtocol
}
err = cli.el.poller.UrgentTrigger(cli.el.register, gc)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/socket"
"github.com/panjf2000/gnet/v2/pkg/buffer/elastic"
gerrors "github.com/panjf2000/gnet/v2/pkg/errors"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
bsPool "github.com/panjf2000/gnet/v2/pkg/pool/byteslice"
"github.com/panjf2000/gnet/v2/pkg/tls"
Expand Down Expand Up @@ -406,7 +406,7 @@ func (c *conn) WriteTCP(p []byte) (int, error) {

func (c *conn) Writev(bs [][]byte) (int, error) {
if c.isDatagram {
return 0, gerrors.ErrUnsupportedOp
return 0, errorx.ErrUnsupportedOp
}
if c.tlsconn != nil {
return c.writevTLS(bs)
Expand Down Expand Up @@ -491,7 +491,7 @@ func (c *conn) AsyncWrite(buf []byte, callback AsyncCallback) error {

func (c *conn) AsyncWritev(bs [][]byte, callback AsyncCallback) error {
if c.isDatagram {
return gerrors.ErrUnsupportedOp
return errorx.ErrUnsupportedOp
}
return c.loop.poller.Trigger(c.asyncWritev, &asyncWritevHook{callback, bs})
}
Expand Down Expand Up @@ -524,15 +524,15 @@ func (c *conn) Close() error {
}

func (*conn) SetDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
return errorx.ErrUnsupportedOp
}

func (*conn) SetReadDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
return errorx.ErrUnsupportedOp
}

func (*conn) SetWriteDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
return errorx.ErrUnsupportedOp
}

func (c *conn) UpgradeTLS(config *tls.Config) (err error) {
Expand Down
12 changes: 7 additions & 5 deletions engine_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package gnet

import (
"context"
"errors"
"runtime"
"sync"
"sync/atomic"
Expand All @@ -33,8 +34,9 @@ type engine struct {
ctx context.Context
cancel context.CancelFunc
}
inShutdown int32 // whether the engine is in shutdown
workerPool struct {
inShutdown int32 // whether the engine is in shutdown
beingShutdown int32 // whether the engine is being shutdown
workerPool struct {
*errgroup.Group

shutdownCtx context.Context
Expand All @@ -50,10 +52,11 @@ func (eng *engine) isInShutdown() bool {

// shutdown signals the engine to shut down.
func (eng *engine) shutdown(err error) {
if err != nil && err != errorx.ErrEngineShutdown {
if err != nil && !errors.Is(err, errorx.ErrEngineShutdown) {
eng.opts.Logger.Errorf("engine is being shutdown with error: %v", err)
}
eng.workerPool.shutdown()
atomic.StoreInt32(&eng.beingShutdown, 1)
}

func (eng *engine) closeEventLoops() {
Expand Down Expand Up @@ -91,7 +94,6 @@ func (eng *engine) start(numEventLoop int) error {
func (eng *engine) stop(engine Engine) error {
<-eng.workerPool.shutdownCtx.Done()

eng.opts.Logger.Infof("engine is being shutdown...")
eng.eventHandler.OnShutdown(engine)

if eng.ticker.cancel != nil {
Expand All @@ -100,7 +102,7 @@ func (eng *engine) stop(engine Engine) error {

eng.closeEventLoops()

if err := eng.workerPool.Wait(); err != nil {
if err := eng.workerPool.Wait(); err != nil && !errors.Is(err, errorx.ErrEngineShutdown) {
eng.opts.Logger.Errorf("engine shutdown error: %v", err)
}

Expand Down
18 changes: 9 additions & 9 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

"github.com/panjf2000/gnet/v2/internal/io"
"github.com/panjf2000/gnet/v2/internal/netpoll"
gerrors "github.com/panjf2000/gnet/v2/pkg/errors"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
)

Expand Down Expand Up @@ -189,7 +189,7 @@ func (el *eventloop) read(c *conn) error {
case Close:
return el.close(c, nil)
case Shutdown:
return gerrors.ErrEngineShutdown
return errorx.ErrEngineShutdown
}
_, _ = c.inboundBuffer.Write(c.buffer)
c.buffer = c.buffer[:0]
Expand Down Expand Up @@ -239,7 +239,7 @@ func (el *eventloop) close(c *conn, err error) (rerr error) {
el.connections.delConn(c)
}
if el.eventHandler.OnClose(c, err) == Shutdown {
return gerrors.ErrEngineShutdown
return errorx.ErrEngineShutdown
}
c.release()
return
Expand Down Expand Up @@ -290,7 +290,7 @@ func (el *eventloop) close(c *conn, err error) (rerr error) {

el.connections.delConn(c)
if el.eventHandler.OnClose(c, err) == Shutdown {
rerr = gerrors.ErrEngineShutdown
rerr = errorx.ErrEngineShutdown
}
c.release()

Expand Down Expand Up @@ -326,7 +326,7 @@ func (el *eventloop) ticker(ctx context.Context) {
switch action {
case None:
case Shutdown:
err := el.poller.UrgentTrigger(func(_ interface{}) error { return gerrors.ErrEngineShutdown }, nil)
err := el.poller.UrgentTrigger(func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)
el.getLogger().Debugf("stopping ticker in event-loop(%d) from OnTick(), UrgentTrigger:%v", el.idx, err)
}
if timer == nil {
Expand All @@ -350,7 +350,7 @@ func (el *eventloop) handleAction(c *conn, action Action) error {
case Close:
return el.close(c, nil)
case Shutdown:
return gerrors.ErrEngineShutdown
return errorx.ErrEngineShutdown
default:
return nil
}
Expand All @@ -377,7 +377,7 @@ func (el *eventloop) readUDP(fd int, _ netpoll.IOEvent) error {
c.release()
}
if action == Shutdown {
return gerrors.ErrEngineShutdown
return errorx.ErrEngineShutdown
}
return nil
}
Expand All @@ -387,7 +387,7 @@ func (el *eventloop) execCmd(itf interface{}) (err error) {
cmd := itf.(*asyncCmd)
c := el.connections.getConnByGFD(cmd.fd)
if c == nil || c.gfd != cmd.fd {
return gerrors.ErrInvalidConn
return errorx.ErrInvalidConn
}
defer func() {
Expand All @@ -406,7 +406,7 @@ func (el *eventloop) execCmd(itf interface{}) (err error) {
case asyncCmdWritev:
_, err = c.Writev(cmd.arg.([][]byte))
default:
return gerrors.ErrUnsupportedOp
return errorx.ErrUnsupportedOp
}
return
}
Expand Down
6 changes: 6 additions & 0 deletions gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ type Writer interface {
// AsyncWrite writes bytes to peer asynchronously, it's goroutine-safe,
// you don't have to invoke it within any method in EventHandler,
// usually you would call it in an individual goroutine.
//
// Note that it will go synchronously with UDP, so it is needless to call
// this asynchronous method, we may disable this method for UDP and just
// return ErrUnsupportedOp in the future, therefore, please don't rely on
// this method to do something important under UDP, if you're working with UDP,
// just call Conn.Write to send back your data.
AsyncWrite(buf []byte, callback AsyncCallback) (err error)

// AsyncWritev writes multiple byte slices to peer asynchronously,
Expand Down
24 changes: 17 additions & 7 deletions gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

gerr "github.com/panjf2000/gnet/v2/pkg/errors"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
bbPool "github.com/panjf2000/gnet/v2/pkg/pool/bytebuffer"
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
Expand Down Expand Up @@ -552,6 +552,11 @@ func (t *testWakeConnServer) OnTick() (delay time.Duration, action Action) {
}

func testWakeConn(t *testing.T, network, addr string) {
currentLogger, currentFlusher := logging.GetDefaultLogger(), logging.GetDefaultFlusher()
t.Cleanup(func() {
logging.SetDefaultLoggerAndFlusher(currentLogger, currentFlusher) // restore
})

svr := &testWakeConnServer{tester: t, network: network, addr: addr, conn: make(chan Conn, 1)}
logger := zap.NewExample()
err := Run(svr, network+"://"+addr,
Expand Down Expand Up @@ -809,8 +814,8 @@ func testShutdownActionOnOpen(t *testing.T, network, addr string) {
err := Run(events, network+"://"+addr, WithTicker(true))
assert.NoError(t, err)
_, err = events.eng.Dup()
assert.ErrorIsf(t, err, gerr.ErrEngineInShutdown, "expected error: %v, but got: %v",
gerr.ErrEngineInShutdown, err)
assert.ErrorIsf(t, err, errorx.ErrEngineInShutdown, "expected error: %v, but got: %v",
errorx.ErrEngineInShutdown, err)
}

func TestUDPShutdown(t *testing.T) {
Expand Down Expand Up @@ -882,7 +887,7 @@ func (t *testCloseConnectionServer) OnTraffic(c Conn) (action Action) {
go func() {
time.Sleep(time.Second)
_ = c.CloseWithCallback(func(c Conn, err error) error {
assert.ErrorIsf(t.tester, err, gerr.ErrEngineShutdown, "should be engine shutdown error")
assert.ErrorIsf(t.tester, err, errorx.ErrEngineShutdown, "should be engine shutdown error")
return nil
})
}()
Expand Down Expand Up @@ -918,7 +923,7 @@ func testCloseConnection(t *testing.T, network, addr string) {

func TestServerOptionsCheck(t *testing.T) {
err := Run(&BuiltinEventEngine{}, "tcp://:3500", WithNumEventLoop(10001), WithLockOSThread(true))
assert.EqualError(t, err, gerr.ErrTooManyEventLoopThreads.Error(), "error returned with LockOSThread option")
assert.EqualError(t, err, errorx.ErrTooManyEventLoopThreads.Error(), "error returned with LockOSThread option")
}

func TestStopServer(t *testing.T) {
Expand Down Expand Up @@ -1061,7 +1066,7 @@ func testEngineStop(t *testing.T, network, addr string) {
require.Greater(t, events2.exchngCount, int64(0))
require.Equal(t, int64(2+1+5+1), events1.exchngCount+events2.exchngCount)
// stop an already stopped engine
require.Equal(t, gerr.ErrEngineInShutdown, events1.eng.Stop(context.Background()))
require.Equal(t, errorx.ErrEngineInShutdown, events1.eng.Stop(context.Background()))
}

// Test should not panic when we wake-up server_closed conn.
Expand Down Expand Up @@ -1145,6 +1150,11 @@ func (t *testMultiInstLoggerRaceServer) OnBoot(_ Engine) (action Action) {
}

func TestMultiInstLoggerRace(t *testing.T) {
currentLogger, currentFlusher := logging.GetDefaultLogger(), logging.GetDefaultFlusher()
t.Cleanup(func() {
logging.SetDefaultLoggerAndFlusher(currentLogger, currentFlusher) // restore
})

logger1, _ := zap.NewDevelopment()
events1 := new(testMultiInstLoggerRaceServer)
g := errgroup.Group{}
Expand All @@ -1160,7 +1170,7 @@ func TestMultiInstLoggerRace(t *testing.T) {
return err
})

assert.ErrorIs(t, g.Wait(), gerr.ErrUnsupportedProtocol)
assert.ErrorIs(t, g.Wait(), errorx.ErrUnsupportedProtocol)
}

var errIncompletePacket = errors.New("incomplete packet")
Expand Down
5 changes: 1 addition & 4 deletions listener_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ func (l *listener) close() {
return
}
logging.Error(os.NewSyscallError("close", l.ln.Close()))
if l.network == "unix" {
logging.Error(os.RemoveAll(l.address))
}
})
}

Expand Down Expand Up @@ -124,7 +121,7 @@ func initListener(network, addr string, options *Options) (l *listener, err erro
}
l.addr = l.pc.LocalAddr()
case "unix":
logging.Error(os.Remove(addr))
_ = os.Remove(addr)
fallthrough
case "tcp", "tcp4", "tcp6":
if l.ln, err = lc.Listen(context.Background(), network, addr); err != nil {
Expand Down
Loading

0 comments on commit d25b6ab

Please sign in to comment.