Skip to content

Commit

Permalink
bug: fix the blocking issue on Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Nov 12, 2024
1 parent 3e1efa6 commit 2e21040
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 91 deletions.
4 changes: 3 additions & 1 deletion acceptor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func (eng *engine) listenStream(ln net.Listener) (err error) {
el.ch <- &netErr{c, err}
return
}
el.ch <- packTCPConn(c, buffer[:n])
pc := packTCPConn(c, buffer[:n])
el.ch <- pc
pc.done <- struct{}{}
}
}(c, tc, el)
}
Expand Down
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,13 +591,13 @@ func (ev *clientEventsForWake) OnTraffic(c Conn) (action Action) {
assert.Nilf(ev.tester, buf, "expected: %v, but got: %v", nil, buf)
assert.ErrorIsf(ev.tester, err, io.ErrShortBuffer, "expected error: %v, but got: %v", io.ErrShortBuffer, err)
buf, err = c.Next(-1)
assert.Nilf(ev.tester, buf, "expected: %v, but got: %v", nil, buf)
assert.Emptyf(ev.tester, buf, "expected an empty slice, but got: %v", buf)
assert.NoErrorf(ev.tester, err, "expected: %v, but got: %v", nil, err)
buf, err = c.Peek(10)
assert.Nilf(ev.tester, buf, "expected: %v, but got: %v", nil, buf)
assert.ErrorIsf(ev.tester, err, io.ErrShortBuffer, "expected error: %v, but got: %v", io.ErrShortBuffer, err)
buf, err = c.Peek(-1)
assert.Nilf(ev.tester, buf, "expected: %v, but got: %v", nil, buf)
assert.Emptyf(ev.tester, buf, "expected an empty slice, but got: %v", buf)
assert.NoErrorf(ev.tester, err, "expected: %v, but got: %v", nil, err)
n, err = c.Discard(10)
assert.Zerof(ev.tester, n, "expected: %v, but got: %v", 0, n)
Expand Down
8 changes: 6 additions & 2 deletions client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ func (cli *Client) EnrollContext(nc net.Conn, ctx interface{}) (gc Conn, err err
el.ch <- &netErr{c, err}
return
}
el.ch <- packTCPConn(c, buffer[:n])
pc := packTCPConn(c, buffer[:n])
el.ch <- pc
pc.done <- struct{}{}
}
}(c, nc, cli.el)
gc = c
Expand All @@ -197,7 +199,9 @@ func (cli *Client) EnrollContext(nc net.Conn, ctx interface{}) (gc Conn, err err
}
return
}
el.ch <- packTCPConn(c, buffer[:n])
pc := packTCPConn(c, buffer[:n])
el.ch <- pc
pc.done <- struct{}{}
}
}(c, nc, cli.el)
gc = c
Expand Down
157 changes: 73 additions & 84 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type netErr struct {
}

type tcpConn struct {
c *conn
buf *bbPool.ByteBuffer
c *conn
done chan struct{}
}

type udpConn struct {
Expand All @@ -59,35 +59,23 @@ type conn struct {
}

func packTCPConn(c *conn, buf []byte) *tcpConn {
tc := &tcpConn{c: c, buf: bbPool.Get()}
_, _ = tc.buf.Write(buf)
return tc
}

func unpackTCPConn(tc *tcpConn) {
tc.c.buffer = tc.buf
tc.buf = nil
}

func resetTCPConn(tc *tcpConn) {
bbPool.Put(tc.c.buffer)
tc.c.buffer = nil
_, _ = c.buffer.Write(buf)
return &tcpConn{c: c, done: make(chan struct{})}
}

func packUDPConn(c *conn, buf []byte) *udpConn {
uc := &udpConn{c}
_, _ = uc.c.buffer.Write(buf)
return uc
_, _ = c.buffer.Write(buf)
return &udpConn{c}
}

func newTCPConn(nc net.Conn, el *eventloop) (c *conn) {
c = &conn{
loop: el,
rawConn: nc,
return &conn{
loop: el,
buffer: bbPool.Get(),
rawConn: nc,
localAddr: nc.LocalAddr(),
remoteAddr: nc.RemoteAddr(),
}
c.localAddr = c.rawConn.LocalAddr()
c.remoteAddr = c.rawConn.RemoteAddr()
return
}

func (c *conn) release() {
Expand Down Expand Up @@ -118,18 +106,11 @@ func (c *conn) resetBuffer() {
}

func (c *conn) Read(p []byte) (n int, err error) {
if c.buffer == nil {
if len(p) == 0 {
return 0, nil
}
return 0, io.ErrShortBuffer
}

if c.inboundBuffer.IsEmpty() {
n = copy(p, c.buffer.B)
c.buffer.B = c.buffer.B[n:]
if n == 0 && len(p) > 0 {
err = io.EOF
err = io.ErrShortBuffer
}
return
}
Expand All @@ -144,13 +125,6 @@ func (c *conn) Read(p []byte) (n int, err error) {
}

func (c *conn) Next(n int) (buf []byte, err error) {
if c.buffer == nil {
if n <= 0 {
return nil, nil
}
return nil, io.ErrShortBuffer
}

inBufferLen := c.inboundBuffer.Buffered()
if totalLen := inBufferLen + c.buffer.Len(); n > totalLen {
return nil, io.ErrShortBuffer
Expand All @@ -166,7 +140,7 @@ func (c *conn) Next(n int) (buf []byte, err error) {
defer c.inboundBuffer.Discard(n) //nolint:errcheck
c.loop.cache.Reset()
c.loop.cache.Write(head)
if len(head) >= n {
if len(head) == n {
return c.loop.cache.Bytes(), err
}
c.loop.cache.Write(tail)
Expand All @@ -181,13 +155,6 @@ func (c *conn) Next(n int) (buf []byte, err error) {
}

func (c *conn) Peek(n int) (buf []byte, err error) {
if c.buffer == nil {
if n <= 0 {
return nil, nil
}
return nil, io.ErrShortBuffer
}

inBufferLen := c.inboundBuffer.Buffered()
if totalLen := inBufferLen + c.buffer.Len(); n > totalLen {
return nil, io.ErrShortBuffer
Expand All @@ -198,8 +165,8 @@ func (c *conn) Peek(n int) (buf []byte, err error) {
return c.buffer.B[:n], err
}
head, tail := c.inboundBuffer.Peek(n)
if len(head) >= n {
return head[:n], err
if len(head) == n {
return head, err
}
c.loop.cache.Reset()
c.loop.cache.Write(head)
Expand All @@ -214,10 +181,6 @@ func (c *conn) Peek(n int) (buf []byte, err error) {
}

func (c *conn) Discard(n int) (int, error) {
if c.buffer == nil {
return 0, nil
}

inBufferLen := c.inboundBuffer.Buffered()
tempBufferLen := c.buffer.Len()
if inBufferLen+tempBufferLen < n || n <= 0 {
Expand Down Expand Up @@ -435,13 +398,24 @@ func (c *conn) SetKeepAlivePeriod(d time.Duration) error {
// func (c *conn) Gfd() gfd.GFD { return gfd.GFD{} }

func (c *conn) AsyncWrite(buf []byte, cb AsyncCallback) error {
if cb == nil {
cb = func(c Conn, err error) error { return nil }
}
_, err := c.Write(buf)
c.loop.ch <- func() error {
return cb(c, err)

callback := func() error {
if cb != nil {
_ = cb(c, err)
}
return err
}

select {
case c.loop.ch <- callback:
default:
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
go func() {
c.loop.ch <- callback
}()
}

return nil
}

Expand All @@ -460,46 +434,61 @@ func (c *conn) AsyncWritev(bs [][]byte, cb AsyncCallback) error {
}

func (c *conn) Wake(cb AsyncCallback) error {
if cb == nil {
cb = func(c Conn, err error) error { return nil }
}
c.loop.ch <- func() (err error) {
defer func() {
defer func() {
if err == nil {
err = cb(c, nil)
return
}
_ = cb(c, err)
}()
wakeFn := func() (err error) {
err = c.loop.wake(c)
if cb != nil {
_ = cb(c, err)
}
return
}

select {
case c.loop.ch <- wakeFn:
default:
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
go func() {
c.loop.ch <- wakeFn
}()
return c.loop.wake(c)
}

return nil
}

func (c *conn) Close() error {
c.loop.ch <- func() error {
err := c.loop.close(c, nil)
return err
closeFn := func() error {
return c.loop.close(c, nil)
}

select {
case c.loop.ch <- closeFn:
default:
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
go func() {
c.loop.ch <- closeFn
}()
}

return nil
}

func (c *conn) CloseWithCallback(cb AsyncCallback) error {
if cb == nil {
cb = func(c Conn, err error) error { return nil }
}
c.loop.ch <- func() (err error) {
defer func() {
if err == nil {
err = cb(c, nil)
return
}
closeFn := func() (err error) {
err = c.loop.close(c, nil)
if cb != nil {
_ = cb(c, err)
}
return
}

select {
case c.loop.ch <- closeFn:
default:
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
go func() {
c.loop.ch <- closeFn
}()
return c.loop.close(c, nil)
}

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,8 @@ func (el *eventloop) run() (err error) {
case *openConn:
err = el.open(v)
case *tcpConn:
unpackTCPConn(v)
err = el.read(v.c)
resetTCPConn(v)
<-v.done
case *udpConn:
err = el.readUDP(v.c)
case func() error:
Expand Down

0 comments on commit 2e21040

Please sign in to comment.