Skip to content

Commit

Permalink
opt: optimize Conn.Next and Conn.Peek
Browse files Browse the repository at this point in the history
Besides, add tests of partial read.
  • Loading branch information
panjf2000 committed Nov 11, 2024
1 parent bdd3fb6 commit 3e1efa6
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 31 deletions.
10 changes: 7 additions & 3 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,18 @@ func (c *conn) Next(n int) (buf []byte, err error) {
} else if n <= 0 {
n = totalLen
}

if c.inboundBuffer.IsEmpty() {
buf = c.buffer[:n]
c.buffer = c.buffer[n:]
return
}

head, tail := c.inboundBuffer.Peek(n)
defer c.inboundBuffer.Discard(n) //nolint:errcheck
c.loop.cache.Reset()
c.loop.cache.Write(head)
if len(head) >= n {
if len(head) == n {

Check warning on line 332 in connection_unix.go

View check run for this annotation

Codecov / codecov/patch

connection_unix.go#L332

Added line #L332 was not covered by tests
return c.loop.cache.Bytes(), err
}
c.loop.cache.Write(tail)
Expand All @@ -348,12 +350,14 @@ func (c *conn) Peek(n int) (buf []byte, err error) {
} else if n <= 0 {
n = totalLen
}

if c.inboundBuffer.IsEmpty() {
return c.buffer[:n], err
}

head, tail := c.inboundBuffer.Peek(n)
if len(head) >= n {
return head[:n], err
if len(head) == n {
return head, err
}
c.loop.cache.Reset()
c.loop.cache.Write(head)
Expand Down
78 changes: 50 additions & 28 deletions gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/binary"
"errors"
"io"
"math"
"math/rand"
"net"
"path/filepath"
Expand Down Expand Up @@ -1542,7 +1543,8 @@ type simServer struct {
multicore bool
nclients int
packetSize int
packetBatch int
batchWrite int
batchRead int
started int32
connected int32
disconnected int32
Expand Down Expand Up @@ -1579,7 +1581,7 @@ func (s *simServer) OnClose(_ Conn, err error) (action Action) {
func (s *simServer) OnTraffic(c Conn) (action Action) {
codec := c.Context().(*testCodec)
var packets [][]byte
for {
for i := 0; i < s.batchRead; i++ {
data, err := codec.Decode(c)
if errors.Is(err, errIncompletePacket) {
break
Expand All @@ -1596,14 +1598,18 @@ func (s *simServer) OnTraffic(c Conn) (action Action) {
} else if n == 1 {
_, _ = c.Write(packets[0])
}
if len(packets) == s.batchRead && c.InboundBuffered() > 0 {
err := c.Wake(nil) // wake up the connection manually to avoid missing the leftover data
assert.NoError(s.tester, err)
}
return
}

func (s *simServer) OnTick() (delay time.Duration, action Action) {
if atomic.CompareAndSwapInt32(&s.started, 0, 1) {
for i := 0; i < s.nclients; i++ {
go func() {
runSimClient(s.tester, s.network, s.addr, s.packetSize, s.packetBatch)
runSimClient(s.tester, s.network, s.addr, s.packetSize, s.batchWrite)
}()
}
}
Expand Down Expand Up @@ -1651,11 +1657,14 @@ func (codec testCodec) Encode(buf []byte) ([]byte, error) {
return data, nil
}

func (codec *testCodec) Decode(c Conn) ([]byte, error) {
func (codec testCodec) Decode(c Conn) ([]byte, error) {
bodyOffset := magicNumberSize + bodySize
buf, _ := c.Peek(bodyOffset)
if len(buf) < bodyOffset {
return nil, errIncompletePacket
buf, err := c.Peek(bodyOffset)
if err != nil {
if errors.Is(err, io.ErrShortBuffer) {
err = errIncompletePacket
}
return nil, err
}

if !bytes.Equal(magicNumberBytes, buf[:magicNumberSize]) {
Expand All @@ -1664,13 +1673,18 @@ func (codec *testCodec) Decode(c Conn) ([]byte, error) {

bodyLen := binary.BigEndian.Uint32(buf[magicNumberSize:bodyOffset])
msgLen := bodyOffset + int(bodyLen)
if c.InboundBuffered() < msgLen {
return nil, errIncompletePacket
buf, err = c.Peek(msgLen)
if err != nil {
if errors.Is(err, io.ErrShortBuffer) {
err = errIncompletePacket
}
return nil, err
}
buf, _ = c.Peek(msgLen)
body := make([]byte, bodyLen)
copy(body, buf[bodyOffset:msgLen])
_, _ = c.Discard(msgLen)

return buf[bodyOffset:msgLen], nil
return body, nil
}

func (codec testCodec) Unpack(buf []byte) ([]byte, error) {
Expand All @@ -1693,41 +1707,48 @@ func (codec testCodec) Unpack(buf []byte) ([]byte, error) {
}

func TestSimServer(t *testing.T) {
t.Run("packet-size=64,batch=200", func(t *testing.T) {
runSimServer(t, ":7200", true, 10, 64, 200, -1)
})
t.Run("packet-size=128,batch=100", func(t *testing.T) {
runSimServer(t, ":7200", false, 10, 128, 100)
runSimServer(t, ":7201", false, 10, 128, 100, 10)
})
t.Run("packet-size=256,batch=50", func(t *testing.T) {
runSimServer(t, ":7201", true, 10, 256, 50)
runSimServer(t, ":7202", true, 10, 256, 50, -1)
})
t.Run("packet-size=512,batch=30", func(t *testing.T) {
runSimServer(t, ":7202", false, 10, 512, 30)
runSimServer(t, ":7203", false, 10, 512, 30, 3)
})
t.Run("packet-size=1024,batch=20", func(t *testing.T) {
runSimServer(t, ":7203", true, 10, 1024, 20)
runSimServer(t, ":7204", true, 10, 1024, 20, -1)
})
t.Run("packet-size=64*1024,batch=10", func(t *testing.T) {
runSimServer(t, ":7204", false, 10, 64*1024, 10)
runSimServer(t, ":7205", false, 10, 64*1024, 10, 1)
})
t.Run("packet-size=128*1024,batch=5", func(t *testing.T) {
runSimServer(t, ":7205", true, 10, 128*1024, 5)
runSimServer(t, ":7206", true, 10, 128*1024, 5, -1)
})
t.Run("packet-size=512*1024,batch=3", func(t *testing.T) {
runSimServer(t, ":7206", false, 10, 512*1024, 3)
runSimServer(t, ":7207", false, 10, 512*1024, 3, 1)
})
t.Run("packet-size=1024*1024,batch=2", func(t *testing.T) {
runSimServer(t, ":7207", true, 10, 1024*1024, 2)
runSimServer(t, ":7208", true, 10, 1024*1024, 2, -1)
})
}

func runSimServer(t *testing.T, addr string, et bool, nclients, packetSize, packetBatch int) {
func runSimServer(t *testing.T, addr string, et bool, nclients, packetSize, batchWrite, batchRead int) {
ts := &simServer{
tester: t,
network: "tcp",
addr: addr,
multicore: true,
nclients: nclients,
packetSize: packetSize,
packetBatch: packetBatch,
tester: t,
network: "tcp",
addr: addr,
multicore: true,
nclients: nclients,
packetSize: packetSize,
batchWrite: batchWrite,
batchRead: batchRead,
}
if batchRead < 0 {
ts.batchRead = math.MaxInt32 // unlimited read batch
}
err := Run(ts,
ts.network+"://"+ts.addr,
Expand Down Expand Up @@ -1789,6 +1810,7 @@ func batchSendAndRecv(t *testing.T, c net.Conn, rd *bufio.Reader, packetSize, ba
for i, req := range requests {
rsp, err := codec.Unpack(respPacket[i*packetLen:])
require.NoError(t, err)
require.Equalf(t, req, rsp, "request and response mismatch, packet size: %d, batch: %d", packetSize, batch)
require.Equalf(t, req, rsp, "request and response mismatch, packet size: %d, batch: %d, round: %d",
packetSize, batch, i)
}
}

0 comments on commit 3e1efa6

Please sign in to comment.