diff --git a/client_test.go b/client_test.go index de9ead434..57902a719 100644 --- a/client_test.go +++ b/client_test.go @@ -19,7 +19,6 @@ package gnet import ( "encoding/binary" - "log" "math/rand" "strings" "sync" @@ -176,7 +175,6 @@ func (s *testCodecClientServer) OnOpened(c Conn) (out []byte, action Action) { } func (s *testCodecClientServer) OnClosed(c Conn, err error) (action Action) { - log.Println("close:", c.RemoteAddr(), err) require.Equal(s.tester, c.Context(), c, "invalid context") atomic.AddInt32(&s.disconnected, 1) if atomic.LoadInt32(&s.connected) == atomic.LoadInt32(&s.disconnected) && diff --git a/connection_unix.go b/connection_unix.go index 1497ab35c..ea41b170a 100644 --- a/connection_unix.go +++ b/connection_unix.go @@ -26,37 +26,36 @@ import ( "github.com/panjf2000/gnet/internal/netpoll" "github.com/panjf2000/gnet/internal/socket" + "github.com/panjf2000/gnet/listbuffer" "github.com/panjf2000/gnet/pool/bytebuffer" - prb "github.com/panjf2000/gnet/pool/ringbuffer" + rbPool "github.com/panjf2000/gnet/pool/ringbuffer" "github.com/panjf2000/gnet/ringbuffer" ) type conn struct { - fd int // file descriptor - sa unix.Sockaddr // remote socket address - ctx interface{} // user-defined context - loop *eventloop // connected event-loop - codec ICodec // codec for TCP - buffer []byte // reuse memory of inbound data as a temporary buffer - opened bool // connection opened event fired - localAddr net.Addr // local addr - remoteAddr net.Addr // remote addr - byteBuffer *bytebuffer.ByteBuffer // bytes buffer for buffering current packet and data in ring-buffer - inboundBuffer *ringbuffer.RingBuffer // buffer for data from the peer - outboundBuffer *ringbuffer.RingBuffer // buffer for data that is ready to write to the peer - pollAttachment *netpoll.PollAttachment // connection attachment for poller + fd int // file descriptor + sa unix.Sockaddr // remote socket address + ctx interface{} // user-defined context + loop *eventloop // connected event-loop + codec ICodec // codec for TCP + opened bool // connection opened event fired + localAddr net.Addr // local addr + remoteAddr net.Addr // remote addr + byteBuffer *bytebuffer.ByteBuffer // bytes buffer for buffering current packet and data in ring-buffer + inboundBuffer *ringbuffer.RingBuffer // buffer for data from the peer + outboundBuffer listbuffer.ByteBufferList // buffer for data that is ready to write to the peer + pollAttachment *netpoll.PollAttachment // connection attachment for poller } func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, codec ICodec, localAddr, remoteAddr net.Addr) (c *conn) { c = &conn{ - fd: fd, - sa: sa, - loop: el, - codec: codec, - localAddr: localAddr, - remoteAddr: remoteAddr, - inboundBuffer: prb.Get(), - outboundBuffer: prb.Get(), + fd: fd, + sa: sa, + loop: el, + codec: codec, + localAddr: localAddr, + remoteAddr: remoteAddr, + inboundBuffer: rbPool.GetWithSize(ringbuffer.TCPReadBufferSize), } c.pollAttachment = netpoll.GetPollAttachment() c.pollAttachment.FD, c.pollAttachment.Callback = fd, c.handleEvents @@ -67,13 +66,11 @@ func (c *conn) releaseTCP() { c.opened = false c.sa = nil c.ctx = nil - c.buffer = nil c.localAddr = nil c.remoteAddr = nil - prb.Put(c.inboundBuffer) - prb.Put(c.outboundBuffer) + rbPool.Put(c.inboundBuffer) c.inboundBuffer = ringbuffer.EmptyRingBuffer - c.outboundBuffer = ringbuffer.EmptyRingBuffer + c.outboundBuffer.Reset() bytebuffer.Put(c.byteBuffer) c.byteBuffer = nil netpoll.PutPollAttachment(c.pollAttachment) @@ -101,12 +98,12 @@ func (c *conn) open(buf []byte) error { c.loop.eventHandler.PreWrite(c) n, err := unix.Write(c.fd, buf) if err != nil && err == unix.EAGAIN { - _, _ = c.outboundBuffer.Write(buf) + c.outboundBuffer.PushBytes(buf) return nil } if err == nil && n < len(buf) { - _, _ = c.outboundBuffer.Write(buf[n:]) + c.outboundBuffer.PushBytes(buf[n:]) } return err @@ -129,7 +126,7 @@ func (c *conn) write(buf []byte) (err error) { // If there is pending data in outbound buffer, the current data ought to be appended to the outbound buffer // for maintaining the sequence of network packets. if !c.outboundBuffer.IsEmpty() { - _, _ = c.outboundBuffer.Write(packet) + c.outboundBuffer.PushBytes(packet) return } @@ -137,15 +134,15 @@ func (c *conn) write(buf []byte) (err error) { if n, err = unix.Write(c.fd, packet); err != nil { // A temporary error occurs, append the data to outbound buffer, writing it back to the peer in the next round. if err == unix.EAGAIN { - _, _ = c.outboundBuffer.Write(packet) + c.outboundBuffer.PushBytes(packet) err = c.loop.poller.ModReadWrite(c.pollAttachment) return } return c.loop.loopCloseConn(c, os.NewSyscallError("write", err)) } - // Fail to send all data back to the peer, buffer the leftover data for the next round. + // Failed to send all data back to the peer, buffer the leftover data for the next round. if n < len(packet) { - _, _ = c.outboundBuffer.Write(packet[n:]) + c.outboundBuffer.PushBytes(packet[n:]) err = c.loop.poller.ModReadWrite(c.pollAttachment) } return @@ -167,76 +164,31 @@ func (c *conn) sendTo(buf []byte) error { // ================================= Public APIs of gnet.Conn ================================= func (c *conn) Read() []byte { - if c.inboundBuffer.IsEmpty() { - return c.buffer - } - c.byteBuffer = c.inboundBuffer.WithByteBuffer(c.buffer) - return c.byteBuffer.B + buf, _ := c.inboundBuffer.PeekAll() + return buf } func (c *conn) ResetBuffer() { - c.buffer = c.buffer[:0] c.inboundBuffer.Reset() - bytebuffer.Put(c.byteBuffer) - c.byteBuffer = nil } -func (c *conn) ReadN(n int) (size int, buf []byte) { +func (c *conn) ReadN(n int) (int, []byte) { inBufferLen := c.inboundBuffer.Length() - tempBufferLen := len(c.buffer) - if totalLen := inBufferLen + tempBufferLen; totalLen < n || n <= 0 { - n = totalLen - } - size = n - if c.inboundBuffer.IsEmpty() { - buf = c.buffer[:n] - return - } - head, tail := c.inboundBuffer.Peek(n) - c.byteBuffer = bytebuffer.Get() - _, _ = c.byteBuffer.Write(head) - _, _ = c.byteBuffer.Write(tail) - if inBufferLen >= n { - buf = c.byteBuffer.B - return + if inBufferLen < n || n <= 0 { + buf, _ := c.inboundBuffer.PeekAll() + return inBufferLen, buf } - - restSize := n - inBufferLen - _, _ = c.byteBuffer.Write(c.buffer[:restSize]) - buf = c.byteBuffer.B - return + buf, _ := c.inboundBuffer.Peek(n) + return n, buf } -func (c *conn) ShiftN(n int) (size int) { - inBufferLen := c.inboundBuffer.Length() - tempBufferLen := len(c.buffer) - if inBufferLen+tempBufferLen < n || n <= 0 { - c.ResetBuffer() - size = inBufferLen + tempBufferLen - return - } - size = n - if c.inboundBuffer.IsEmpty() { - c.buffer = c.buffer[n:] - return - } - - bytebuffer.Put(c.byteBuffer) - c.byteBuffer = nil - - if inBufferLen >= n { - c.inboundBuffer.Discard(n) - return - } - c.inboundBuffer.Reset() - - restSize := n - inBufferLen - c.buffer = c.buffer[restSize:] - return +func (c *conn) ShiftN(n int) int { + c.inboundBuffer.Discard(n) + return n } func (c *conn) BufferLength() int { - return c.inboundBuffer.Length() + len(c.buffer) + return c.inboundBuffer.Length() } func (c *conn) AsyncWrite(buf []byte) error { diff --git a/errors/errors.go b/errors/errors.go index cd964c387..f2c1a582d 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -52,11 +52,4 @@ var ( ErrUnsupportedLength = errors.New("unsupported lengthFieldLength. (expected: 1, 2, 3, 4, or 8)") // ErrTooLessLength occurs when adjusted frame length is less than zero. ErrTooLessLength = errors.New("adjusted frame length is less than zero") - - // =============================================== internal errors ===============================================. - - // ErrShortWritev occurs when internal/io.Writev fails to send all data. - ErrShortWritev = errors.New("short writev") - // ErrShortReadv occurs when internal/io.Readv fails to send all data. - ErrShortReadv = errors.New("short readv") ) diff --git a/eventloop_unix.go b/eventloop_unix.go index 684070907..a62ee3964 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -96,15 +96,13 @@ func (el *eventloop) loopOpen(c *conn) error { } func (el *eventloop) loopRead(c *conn) error { - n, err := unix.Read(c.fd, el.buffer) + n, err := c.inboundBuffer.CopyFromSocket(c.fd, unix.Read) if n == 0 || err != nil { if err == unix.EAGAIN { return nil } return el.loopCloseConn(c, os.NewSyscallError("read", err)) } - c.buffer = el.buffer[:n] - for packet, _ := c.read(); packet != nil; packet, _ = c.read() { out, action := el.eventHandler.React(packet, c) if out != nil { @@ -129,27 +127,18 @@ func (el *eventloop) loopRead(c *conn) error { return nil } } - _, _ = c.inboundBuffer.Write(c.buffer) - + _ = c.inboundBuffer.MoveLeftoverToHead() return nil } func (el *eventloop) loopWrite(c *conn) error { el.eventHandler.PreWrite(c) - head, tail := c.outboundBuffer.PeekAll() - var ( - n int - err error - ) - if len(tail) > 0 { - n, err = io.Writev(c.fd, [][]byte{head, tail}) - } else { - n, err = unix.Write(c.fd, head) - } - c.outboundBuffer.Discard(n) + iov := c.outboundBuffer.PeekBytesList() + n, err := io.Writev(c.fd, iov) + c.outboundBuffer.DiscardBytes(n) switch err { - case nil, gerrors.ErrShortWritev: // do nothing, just go on + case nil: case unix.EAGAIN: return nil default: @@ -162,8 +151,6 @@ func (el *eventloop) loopWrite(c *conn) error { _ = el.poller.ModRead(c.pollAttachment) } - el.eventHandler.AfterWrite(c, nil) - return nil } @@ -175,13 +162,9 @@ func (el *eventloop) loopCloseConn(c *conn, err error) (rerr error) { // Send residual data in buffer back to the peer before actually closing the connection. if !c.outboundBuffer.IsEmpty() { el.eventHandler.PreWrite(c) - head, tail := c.outboundBuffer.PeekAll() - if n, err := unix.Write(c.fd, head); err == nil { - if n == len(head) && tail != nil { - _, _ = unix.Write(c.fd, tail) - } - } - el.eventHandler.AfterWrite(c, nil) + iov := c.outboundBuffer.PeekBytesList() + _, _ = io.Writev(c.fd, iov) + c.outboundBuffer.Reset() } if err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd); err0 == nil && err1 == nil { diff --git a/gnet.go b/gnet.go index 69a7d7023..38d8160af 100644 --- a/gnet.go +++ b/gnet.go @@ -25,6 +25,7 @@ import ( "github.com/panjf2000/gnet/errors" "github.com/panjf2000/gnet/internal" "github.com/panjf2000/gnet/logging" + "github.com/panjf2000/gnet/ringbuffer" ) // Action is an action that occurs after the completion of an event. @@ -283,9 +284,10 @@ func Serve(eventHandler EventHandler, protoAddr string, opts ...Option) (err err } if rbc := options.ReadBufferCap; rbc <= 0 { - options.ReadBufferCap = 0x10000 + options.ReadBufferCap = ringbuffer.TCPReadBufferSize } else { options.ReadBufferCap = internal.CeilToPowerOfTwo(rbc) + ringbuffer.TCPReadBufferSize = options.ReadBufferCap } network, addr := parseProtoAddr(protoAddr) diff --git a/internal/io/io_linux.go b/internal/io/io_linux.go index da577920d..641975c59 100644 --- a/internal/io/io_linux.go +++ b/internal/io/io_linux.go @@ -21,10 +21,16 @@ import "golang.org/x/sys/unix" // Writev calls writev() on Linux. func Writev(fd int, iov [][]byte) (int, error) { + if len(iov) == 0 { + return 0, nil + } return unix.Writev(fd, iov) } // Readv calls readv() on Linux. func Readv(fd int, iov [][]byte) (int, error) { + if len(iov) == 0 { + return 0, nil + } return unix.Readv(fd, iov) } diff --git a/internal/socket/tcp_socket.go b/internal/socket/tcp_socket.go index 35eefc3a8..eeed6637b 100644 --- a/internal/socket/tcp_socket.go +++ b/internal/socket/tcp_socket.go @@ -29,6 +29,7 @@ import ( var listenerBacklogMaxSize = maxListenerBacklog() +// GetTCPSockAddr the structured addresses based on the protocol and raw address. func GetTCPSockAddr(proto, addr string) (sa unix.Sockaddr, family int, tcpAddr *net.TCPAddr, ipv6only bool, err error) { var tcpVersion string diff --git a/internal/socket/udp_socket.go b/internal/socket/udp_socket.go index f5b15ef2c..0c6456d97 100644 --- a/internal/socket/udp_socket.go +++ b/internal/socket/udp_socket.go @@ -27,6 +27,7 @@ import ( "github.com/panjf2000/gnet/errors" ) +// GetUDPSockAddr the structured addresses based on the protocol and raw address. func GetUDPSockAddr(proto, addr string) (sa unix.Sockaddr, family int, udpAddr *net.UDPAddr, ipv6only bool, err error) { var udpVersion string diff --git a/internal/socket/unix_socket.go b/internal/socket/unix_socket.go index cad4f8593..ec90d61c9 100644 --- a/internal/socket/unix_socket.go +++ b/internal/socket/unix_socket.go @@ -26,6 +26,7 @@ import ( "github.com/panjf2000/gnet/errors" ) +// GetUnixSockAddr the structured addresses based on the protocol and raw address. func GetUnixSockAddr(proto, addr string) (sa unix.Sockaddr, family int, unixAddr *net.UnixAddr, err error) { unixAddr, err = net.ResolveUnixAddr(proto, addr) if err != nil { diff --git a/listbuffer/linked_list_buffer.go b/listbuffer/linked_list_buffer.go new file mode 100644 index 000000000..259b9f3bf --- /dev/null +++ b/listbuffer/linked_list_buffer.go @@ -0,0 +1,167 @@ +// Copyright (c) 2021 Andy Pan +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package listbuffer + +import ( + bPool "github.com/panjf2000/gnet/pool/bytebuffer" +) + +// ByteBuffer is the node of the linked list of bytes. +type ByteBuffer struct { + Buf *bPool.ByteBuffer + next *ByteBuffer +} + +// Len returns the length of ByteBuffer. +func (b *ByteBuffer) Len() int { + if b.Buf == nil { + return -1 + } + return b.Buf.Len() +} + +// IsEmpty indicates whether the ByteBuffer is empty. +func (b *ByteBuffer) IsEmpty() bool { + if b.Buf == nil { + return true + } + return b.Buf.Len() == 0 +} + +// ByteBufferList is a linked list of ByteBuffer. +type ByteBufferList struct { + bs [][]byte + head *ByteBuffer + tail *ByteBuffer + size int + bytes int64 +} + +// Pop returns and removes the head of l. If l is empty, it returns nil. +func (l *ByteBufferList) Pop() *ByteBuffer { + if l.head == nil { + return nil + } + b := l.head + l.head = b.next + if l.head == nil { + l.tail = nil + } + b.next = nil + l.size-- + l.bytes -= int64(b.Buf.Len()) + return b +} + +// PushFront adds the new node to the head of l. +func (l *ByteBufferList) PushFront(b *ByteBuffer) { + if b == nil { + return + } + if l.head == nil { + b.next = nil + l.tail = b + } else { + b.next = l.head + } + l.head = b + l.size++ + l.bytes += int64(b.Buf.Len()) +} + +// PushBack adds a new node to the tail of l. +func (l *ByteBufferList) PushBack(b *ByteBuffer) { + if b == nil { + return + } + if l.tail == nil { + l.head = b + } else { + l.tail.next = b + } + b.next = nil + l.tail = b + l.size++ + l.bytes += int64(b.Buf.Len()) +} + +// PushBytes is a wrapper of PushBack, which accepts []byte as its argument. +func (l *ByteBufferList) PushBytes(p []byte) { + if len(p) == 0 { + return + } + bb := bPool.Get() + _, _ = bb.Write(p) + l.PushBack(&ByteBuffer{Buf: bb}) +} + +// PeekBytesList assembles the [][]byte based on the list of ByteBuffer, +// it won't remove these nodes from l until DiscardBytes() is called. +func (l *ByteBufferList) PeekBytesList() [][]byte { + l.bs = l.bs[:0] + iter := l.head + for iter != nil { + l.bs = append(l.bs, iter.Buf.B) + iter = iter.next + } + return l.bs +} + +// DiscardBytes removes some nodes based on n. +func (l *ByteBufferList) DiscardBytes(n int) { + if n <= 0 { + return + } + for n != 0 { + b := l.Pop() + if b == nil { + break + } + if n < b.Len() { + b.Buf.B = b.Buf.B[n:] + l.PushFront(b) + break + } + n -= b.Len() + bPool.Put(b.Buf) + } +} + +// Len returns the length of the list. +func (l *ByteBufferList) Len() int { + return l.size +} + +// Bytes returns the amount of bytes in this list. +func (l *ByteBufferList) Bytes() int64 { + return l.bytes +} + +// IsEmpty reports whether l is empty. +func (l *ByteBufferList) IsEmpty() bool { + return l.head == nil +} + +// Reset removes all elements in this list. +func (l *ByteBufferList) Reset() { + for b := l.Pop(); b != nil; b = l.Pop() { + bPool.Put(b.Buf) + } + l.head = nil + l.tail = nil + l.size = 0 + l.bytes = 0 + l.bs = l.bs[:0] +} diff --git a/logging/logger.go b/logging/logger.go index 1661eeaaf..c8b6ea4c2 100644 --- a/logging/logger.go +++ b/logging/logger.go @@ -62,6 +62,7 @@ var ( defaultLoggingLevel Level ) +// Level is the alias of zapcore.Level. type Level = zapcore.Level const ( diff --git a/pool/ringbuffer/ringbuffer.go b/pool/ringbuffer/ringbuffer.go index e95b05975..2ef88877b 100644 --- a/pool/ringbuffer/ringbuffer.go +++ b/pool/ringbuffer/ringbuffer.go @@ -67,7 +67,7 @@ func Get() *RingBuffer { return defaultPool.Get() } // // The byte buffer may be returned to the pool via Put after the use // in order to minimize GC overhead. -func (p *Pool) Get() *ringbuffer.RingBuffer { +func (p *Pool) Get() *RingBuffer { v := p.pool.Get() if v != nil { return v.(*RingBuffer) @@ -75,6 +75,22 @@ func (p *Pool) Get() *ringbuffer.RingBuffer { return ringbuffer.New(int(atomic.LoadUint64(&p.defaultSize))) } +// GetWithSize is like Get(), but with initial size. +func GetWithSize(size int) *RingBuffer { return defaultPool.GetWithSize(size) } + +// GetWithSize is like Pool.Get(), but with initial size. +func (p *Pool) GetWithSize(size int) *RingBuffer { + v := p.pool.Get() + if v != nil { + rb := v.(*RingBuffer) + if rb.Cap() >= size { + return rb + } + p.pool.Put(v) + } + return ringbuffer.New(size) +} + // Put returns byte buffer to the pool. // // ByteBuffer.B mustn't be touched after returning it to the pool. diff --git a/ringbuffer/ring_buffer.go b/ringbuffer/ring_buffer.go index 187032eb0..c49114538 100644 --- a/ringbuffer/ring_buffer.go +++ b/ringbuffer/ring_buffer.go @@ -31,7 +31,10 @@ const ( bufferGrowThreshold = 4 * 1024 // 4KB ) -// ErrIsEmpty will be returned when trying to read a empty ring-buffer. +// TCPReadBufferSize is the default read buffer size for each TCP socket. +var TCPReadBufferSize = 64 * 1024 // 64KB + +// ErrIsEmpty will be returned when trying to read an empty ring-buffer. var ErrIsEmpty = errors.New("ring-buffer is empty") // RingBuffer is a circular buffer that implement io.ReaderWriter interface. @@ -240,6 +243,42 @@ func (rb *RingBuffer) Write(p []byte) (n int, err error) { return } +// ========================= gnet specific APIs ========================= + +// CopyFromSocket copies data from a socket fd into ring-buffer. +func (rb *RingBuffer) CopyFromSocket(fd int, read func(int, []byte) (int, error)) (n int, err error) { + n, err = read(fd, rb.buf[rb.w:]) + if n > 0 { + rb.isEmpty = false + } + rb.w += n + if rb.w == rb.size { + rb.w = 0 + } + return +} + +// MoveLeftoverToHead moves the data from its tail to head. +func (rb *RingBuffer) MoveLeftoverToHead() int { + if rb.IsEmpty() { + rb.Reset() + return 0 + } + if rb.w != 0 { + return 0 + } + if rb.r < rb.Length() { + rb.grow(rb.Length() + TCPReadBufferSize) + return rb.Length() + } + n := copy(rb.buf, rb.buf[rb.r:]) + rb.r = 0 + rb.w = n + return n +} + +// ========================= gnet specific APIs ========================= + // WriteByte writes one byte into buffer. func (rb *RingBuffer) WriteByte(c byte) error { if rb.Free() < 1 {