Skip to content

Commit

Permalink
opt: optimize the buffer management and network I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Nov 28, 2021
1 parent c85cf92 commit 6aba6d7
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 127 deletions.
2 changes: 0 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package gnet

import (
"encoding/binary"
"log"
"math/rand"
"strings"
"sync"
Expand Down Expand Up @@ -176,7 +175,6 @@ func (s *testCodecClientServer) OnOpened(c Conn) (out []byte, action Action) {
}

func (s *testCodecClientServer) OnClosed(c Conn, err error) (action Action) {
log.Println("close:", c.RemoteAddr(), err)
require.Equal(s.tester, c.Context(), c, "invalid context")
atomic.AddInt32(&s.disconnected, 1)
if atomic.LoadInt32(&s.connected) == atomic.LoadInt32(&s.disconnected) &&
Expand Down
130 changes: 41 additions & 89 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,37 +26,36 @@ import (

"github.com/panjf2000/gnet/internal/netpoll"
"github.com/panjf2000/gnet/internal/socket"
"github.com/panjf2000/gnet/listbuffer"
"github.com/panjf2000/gnet/pool/bytebuffer"
prb "github.com/panjf2000/gnet/pool/ringbuffer"
rbPool "github.com/panjf2000/gnet/pool/ringbuffer"
"github.com/panjf2000/gnet/ringbuffer"
)

type conn struct {
fd int // file descriptor
sa unix.Sockaddr // remote socket address
ctx interface{} // user-defined context
loop *eventloop // connected event-loop
codec ICodec // codec for TCP
buffer []byte // reuse memory of inbound data as a temporary buffer
opened bool // connection opened event fired
localAddr net.Addr // local addr
remoteAddr net.Addr // remote addr
byteBuffer *bytebuffer.ByteBuffer // bytes buffer for buffering current packet and data in ring-buffer
inboundBuffer *ringbuffer.RingBuffer // buffer for data from the peer
outboundBuffer *ringbuffer.RingBuffer // buffer for data that is ready to write to the peer
pollAttachment *netpoll.PollAttachment // connection attachment for poller
fd int // file descriptor
sa unix.Sockaddr // remote socket address
ctx interface{} // user-defined context
loop *eventloop // connected event-loop
codec ICodec // codec for TCP
opened bool // connection opened event fired
localAddr net.Addr // local addr
remoteAddr net.Addr // remote addr
byteBuffer *bytebuffer.ByteBuffer // bytes buffer for buffering current packet and data in ring-buffer
inboundBuffer *ringbuffer.RingBuffer // buffer for data from the peer
outboundBuffer listbuffer.ByteBufferList // buffer for data that is ready to write to the peer
pollAttachment *netpoll.PollAttachment // connection attachment for poller
}

func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, codec ICodec, localAddr, remoteAddr net.Addr) (c *conn) {
c = &conn{
fd: fd,
sa: sa,
loop: el,
codec: codec,
localAddr: localAddr,
remoteAddr: remoteAddr,
inboundBuffer: prb.Get(),
outboundBuffer: prb.Get(),
fd: fd,
sa: sa,
loop: el,
codec: codec,
localAddr: localAddr,
remoteAddr: remoteAddr,
inboundBuffer: rbPool.GetWithSize(ringbuffer.TCPReadBufferSize),
}
c.pollAttachment = netpoll.GetPollAttachment()
c.pollAttachment.FD, c.pollAttachment.Callback = fd, c.handleEvents
Expand All @@ -67,13 +66,11 @@ func (c *conn) releaseTCP() {
c.opened = false
c.sa = nil
c.ctx = nil
c.buffer = nil
c.localAddr = nil
c.remoteAddr = nil
prb.Put(c.inboundBuffer)
prb.Put(c.outboundBuffer)
rbPool.Put(c.inboundBuffer)
c.inboundBuffer = ringbuffer.EmptyRingBuffer
c.outboundBuffer = ringbuffer.EmptyRingBuffer
c.outboundBuffer.Reset()
bytebuffer.Put(c.byteBuffer)
c.byteBuffer = nil
netpoll.PutPollAttachment(c.pollAttachment)
Expand Down Expand Up @@ -101,12 +98,12 @@ func (c *conn) open(buf []byte) error {
c.loop.eventHandler.PreWrite(c)
n, err := unix.Write(c.fd, buf)
if err != nil && err == unix.EAGAIN {
_, _ = c.outboundBuffer.Write(buf)
c.outboundBuffer.PushBytes(buf)
return nil
}

if err == nil && n < len(buf) {
_, _ = c.outboundBuffer.Write(buf[n:])
c.outboundBuffer.PushBytes(buf[n:])
}

return err
Expand All @@ -129,23 +126,23 @@ func (c *conn) write(buf []byte) (err error) {
// If there is pending data in outbound buffer, the current data ought to be appended to the outbound buffer
// for maintaining the sequence of network packets.
if !c.outboundBuffer.IsEmpty() {
_, _ = c.outboundBuffer.Write(packet)
c.outboundBuffer.PushBytes(packet)
return
}

var n int
if n, err = unix.Write(c.fd, packet); err != nil {
// A temporary error occurs, append the data to outbound buffer, writing it back to the peer in the next round.
if err == unix.EAGAIN {
_, _ = c.outboundBuffer.Write(packet)
c.outboundBuffer.PushBytes(packet)
err = c.loop.poller.ModReadWrite(c.pollAttachment)
return
}
return c.loop.loopCloseConn(c, os.NewSyscallError("write", err))
}
// Fail to send all data back to the peer, buffer the leftover data for the next round.
// Failed to send all data back to the peer, buffer the leftover data for the next round.
if n < len(packet) {
_, _ = c.outboundBuffer.Write(packet[n:])
c.outboundBuffer.PushBytes(packet[n:])
err = c.loop.poller.ModReadWrite(c.pollAttachment)
}
return
Expand All @@ -167,76 +164,31 @@ func (c *conn) sendTo(buf []byte) error {
// ================================= Public APIs of gnet.Conn =================================

func (c *conn) Read() []byte {
if c.inboundBuffer.IsEmpty() {
return c.buffer
}
c.byteBuffer = c.inboundBuffer.WithByteBuffer(c.buffer)
return c.byteBuffer.B
buf, _ := c.inboundBuffer.PeekAll()
return buf
}

func (c *conn) ResetBuffer() {
c.buffer = c.buffer[:0]
c.inboundBuffer.Reset()
bytebuffer.Put(c.byteBuffer)
c.byteBuffer = nil
}

func (c *conn) ReadN(n int) (size int, buf []byte) {
func (c *conn) ReadN(n int) (int, []byte) {
inBufferLen := c.inboundBuffer.Length()
tempBufferLen := len(c.buffer)
if totalLen := inBufferLen + tempBufferLen; totalLen < n || n <= 0 {
n = totalLen
}
size = n
if c.inboundBuffer.IsEmpty() {
buf = c.buffer[:n]
return
}
head, tail := c.inboundBuffer.Peek(n)
c.byteBuffer = bytebuffer.Get()
_, _ = c.byteBuffer.Write(head)
_, _ = c.byteBuffer.Write(tail)
if inBufferLen >= n {
buf = c.byteBuffer.B
return
if inBufferLen < n || n <= 0 {
buf, _ := c.inboundBuffer.PeekAll()
return inBufferLen, buf
}

restSize := n - inBufferLen
_, _ = c.byteBuffer.Write(c.buffer[:restSize])
buf = c.byteBuffer.B
return
buf, _ := c.inboundBuffer.Peek(n)
return n, buf
}

func (c *conn) ShiftN(n int) (size int) {
inBufferLen := c.inboundBuffer.Length()
tempBufferLen := len(c.buffer)
if inBufferLen+tempBufferLen < n || n <= 0 {
c.ResetBuffer()
size = inBufferLen + tempBufferLen
return
}
size = n
if c.inboundBuffer.IsEmpty() {
c.buffer = c.buffer[n:]
return
}

bytebuffer.Put(c.byteBuffer)
c.byteBuffer = nil

if inBufferLen >= n {
c.inboundBuffer.Discard(n)
return
}
c.inboundBuffer.Reset()

restSize := n - inBufferLen
c.buffer = c.buffer[restSize:]
return
func (c *conn) ShiftN(n int) int {
c.inboundBuffer.Discard(n)
return n
}

func (c *conn) BufferLength() int {
return c.inboundBuffer.Length() + len(c.buffer)
return c.inboundBuffer.Length()
}

func (c *conn) AsyncWrite(buf []byte) error {
Expand Down
7 changes: 0 additions & 7 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,4 @@ var (
ErrUnsupportedLength = errors.New("unsupported lengthFieldLength. (expected: 1, 2, 3, 4, or 8)")
// ErrTooLessLength occurs when adjusted frame length is less than zero.
ErrTooLessLength = errors.New("adjusted frame length is less than zero")

// =============================================== internal errors ===============================================.

// ErrShortWritev occurs when internal/io.Writev fails to send all data.
ErrShortWritev = errors.New("short writev")
// ErrShortReadv occurs when internal/io.Readv fails to send all data.
ErrShortReadv = errors.New("short readv")
)
35 changes: 9 additions & 26 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,13 @@ func (el *eventloop) loopOpen(c *conn) error {
}

func (el *eventloop) loopRead(c *conn) error {
n, err := unix.Read(c.fd, el.buffer)
n, err := c.inboundBuffer.CopyFromSocket(c.fd, unix.Read)
if n == 0 || err != nil {
if err == unix.EAGAIN {
return nil
}
return el.loopCloseConn(c, os.NewSyscallError("read", err))
}
c.buffer = el.buffer[:n]

for packet, _ := c.read(); packet != nil; packet, _ = c.read() {
out, action := el.eventHandler.React(packet, c)
if out != nil {
Expand All @@ -129,27 +127,18 @@ func (el *eventloop) loopRead(c *conn) error {
return nil
}
}
_, _ = c.inboundBuffer.Write(c.buffer)

_ = c.inboundBuffer.MoveLeftoverToHead()
return nil
}

func (el *eventloop) loopWrite(c *conn) error {
el.eventHandler.PreWrite(c)

head, tail := c.outboundBuffer.PeekAll()
var (
n int
err error
)
if len(tail) > 0 {
n, err = io.Writev(c.fd, [][]byte{head, tail})
} else {
n, err = unix.Write(c.fd, head)
}
c.outboundBuffer.Discard(n)
iov := c.outboundBuffer.PeekBytesList()
n, err := io.Writev(c.fd, iov)
c.outboundBuffer.DiscardBytes(n)
switch err {
case nil, gerrors.ErrShortWritev: // do nothing, just go on
case nil:
case unix.EAGAIN:
return nil
default:
Expand All @@ -162,8 +151,6 @@ func (el *eventloop) loopWrite(c *conn) error {
_ = el.poller.ModRead(c.pollAttachment)
}

el.eventHandler.AfterWrite(c, nil)

return nil
}

Expand All @@ -175,13 +162,9 @@ func (el *eventloop) loopCloseConn(c *conn, err error) (rerr error) {
// Send residual data in buffer back to the peer before actually closing the connection.
if !c.outboundBuffer.IsEmpty() {
el.eventHandler.PreWrite(c)
head, tail := c.outboundBuffer.PeekAll()
if n, err := unix.Write(c.fd, head); err == nil {
if n == len(head) && tail != nil {
_, _ = unix.Write(c.fd, tail)
}
}
el.eventHandler.AfterWrite(c, nil)
iov := c.outboundBuffer.PeekBytesList()
_, _ = io.Writev(c.fd, iov)
c.outboundBuffer.Reset()
}

if err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd); err0 == nil && err1 == nil {
Expand Down
4 changes: 3 additions & 1 deletion gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal"
"github.com/panjf2000/gnet/logging"
"github.com/panjf2000/gnet/ringbuffer"
)

// Action is an action that occurs after the completion of an event.
Expand Down Expand Up @@ -283,9 +284,10 @@ func Serve(eventHandler EventHandler, protoAddr string, opts ...Option) (err err
}

if rbc := options.ReadBufferCap; rbc <= 0 {
options.ReadBufferCap = 0x10000
options.ReadBufferCap = ringbuffer.TCPReadBufferSize
} else {
options.ReadBufferCap = internal.CeilToPowerOfTwo(rbc)
ringbuffer.TCPReadBufferSize = options.ReadBufferCap
}

network, addr := parseProtoAddr(protoAddr)
Expand Down
6 changes: 6 additions & 0 deletions internal/io/io_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@ import "golang.org/x/sys/unix"

// Writev calls writev() on Linux.
func Writev(fd int, iov [][]byte) (int, error) {
if len(iov) == 0 {
return 0, nil
}
return unix.Writev(fd, iov)
}

// Readv calls readv() on Linux.
func Readv(fd int, iov [][]byte) (int, error) {
if len(iov) == 0 {
return 0, nil
}
return unix.Readv(fd, iov)
}
1 change: 1 addition & 0 deletions internal/socket/tcp_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

var listenerBacklogMaxSize = maxListenerBacklog()

// GetTCPSockAddr the structured addresses based on the protocol and raw address.
func GetTCPSockAddr(proto, addr string) (sa unix.Sockaddr, family int, tcpAddr *net.TCPAddr, ipv6only bool, err error) {
var tcpVersion string

Expand Down
1 change: 1 addition & 0 deletions internal/socket/udp_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/panjf2000/gnet/errors"
)

// GetUDPSockAddr the structured addresses based on the protocol and raw address.
func GetUDPSockAddr(proto, addr string) (sa unix.Sockaddr, family int, udpAddr *net.UDPAddr, ipv6only bool, err error) {
var udpVersion string

Expand Down
1 change: 1 addition & 0 deletions internal/socket/unix_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/panjf2000/gnet/errors"
)

// GetUnixSockAddr the structured addresses based on the protocol and raw address.
func GetUnixSockAddr(proto, addr string) (sa unix.Sockaddr, family int, unixAddr *net.UnixAddr, err error) {
unixAddr, err = net.ResolveUnixAddr(proto, addr)
if err != nil {
Expand Down
Loading

0 comments on commit 6aba6d7

Please sign in to comment.