From ef3b6248ba60cd9208862eec44d59994fadc4894 Mon Sep 17 00:00:00 2001 From: fako1024 Date: Mon, 11 Sep 2023 11:52:50 +0200 Subject: [PATCH 01/15] Rewrite crucial paths for optimized ring buffer data access --- capture/afpacket/afring/afring.go | 120 +++++++++++------------------ capture/afpacket/afring/tpacket.go | 114 ++++++++++++++++++++++----- 2 files changed, 139 insertions(+), 95 deletions(-) diff --git a/capture/afpacket/afring/afring.go b/capture/afpacket/afring/afring.go index 0413b7b..616dc05 100644 --- a/capture/afpacket/afring/afring.go +++ b/capture/afpacket/afring/afring.go @@ -139,121 +139,93 @@ func (s *Source) NewPacket() capture.Packet { // NextPacket receives the next packet from the source and returns it. The operation is blocking. In // case a non-nil "buffer" Packet is provided it will be populated with the data (and returned). The // buffer packet can be reused. Otherwise a new Packet is allocated. -func (s *Source) NextPacket(pBuf capture.Packet) (capture.Packet, error) { +func (s *Source) NextPacket(pBuf capture.Packet) (pkt capture.Packet, err error) { - if err := s.nextPacket(); err != nil { - return nil, err + if err = s.nextPacket(); err != nil { + return } - var ( - data capture.Packet - snapLen = int(s.curTPacketHeader.snapLen()) - ) - // If a buffer was provided, et the correct length of the buffer and populate it - // Otherwise, allocate a new Packet + // If a buffer was provided, extend it to maximum capacity if pBuf != nil { - data = pBuf[:cap(pBuf)] - } else { - data = make(capture.Packet, capture.PacketHdrOffset+snapLen) + pkt = pBuf[:cap(pBuf)] } - // Populate the packet - data[0] = s.curTPacketHeader.packetType() - data[1] = s.ipLayerOffset - s.curTPacketHeader.pktLenPut(data[2:6]) - s.curTPacketHeader.payloadCopyPutAtOffset(data[6:], 0, uint32(snapLen)) - if snapLen+capture.PacketHdrOffset < len(data) { - data = data[:capture.PacketHdrOffset+snapLen] - } + // Populate the packet / buffer + pkt = s.curTPacketHeader.packetPut(pkt, s.ipLayerOffset) - return data, nil + return } // NextPayload receives the raw payload of the next packet from the source and returns it. The operation is blocking. // In case a non-nil "buffer" byte slice / payload is provided it will be populated with the data (and returned). // The buffer can be reused. Otherwise a new byte slice / payload is allocated. -func (s *Source) NextPayload(pBuf []byte) ([]byte, capture.PacketType, uint32, error) { +func (s *Source) NextPayload(pBuf []byte) (payload []byte, pktType capture.PacketType, pktLen uint32, err error) { - if err := s.nextPacket(); err != nil { - return nil, capture.PacketUnknown, 0, err + if err = s.nextPacket(); err != nil { + pktType = capture.PacketUnknown + return } - var ( - data []byte - snapLen = s.curTPacketHeader.snapLen() - ) - // If a buffer was provided, et the correct length of the buffer and populate it - // Otherwise, allocate a new byte slice - then populate the packet + // If a buffer was provided, extend it to maximum capacity if pBuf != nil { - data = s.curTPacketHeader.payloadNoCopyAtOffset(0, snapLen) - } else { - data = make([]byte, snapLen) - s.curTPacketHeader.payloadCopyPutAtOffset(data, 0, snapLen) + payload = pBuf[:cap(pBuf)] } - if int(snapLen) < len(data) { - data = data[:snapLen] - } + // Populate the payload / buffer & parameters + payload, pktType, pktLen = s.curTPacketHeader.payloadPut(payload, 0) - return data, s.curTPacketHeader.packetType(), s.curTPacketHeader.pktLen(), nil + return } // NextPayloadZeroCopy receives the raw payload of the next packet from the source and returns it. The operation is blocking. // The returned payload provides direct zero-copy access to the underlying data source (e.g. a ring buffer). -func (s *Source) NextPayloadZeroCopy() ([]byte, capture.PacketType, uint32, error) { +func (s *Source) NextPayloadZeroCopy() (payload []byte, pktType capture.PacketType, pktLen uint32, err error) { - if err := s.nextPacket(); err != nil { - return nil, capture.PacketUnknown, 0, err + if err = s.nextPacket(); err != nil { + pktType = capture.PacketUnknown + return } - return s.curTPacketHeader.payloadNoCopyAtOffset(0, s.curTPacketHeader.snapLen()), - s.curTPacketHeader.packetType(), - s.curTPacketHeader.pktLen(), - nil + // Extract the payload (zero-copy) & parameters + payload, pktType, pktLen = s.curTPacketHeader.payloadZeroCopy(0) + + return } // NextIPPacket receives the IP layer of the next packet from the source and returns it. The operation is blocking. // In case a non-nil "buffer" IPLayer is provided it will be populated with the data (and returned). // The buffer can be reused. Otherwise a new IPLayer is allocated. -func (s *Source) NextIPPacket(pBuf capture.IPLayer) (capture.IPLayer, capture.PacketType, uint32, error) { +func (s *Source) NextIPPacket(pBuf capture.IPLayer) (ipLayer capture.IPLayer, pktType capture.PacketType, pktLen uint32, err error) { - if err := s.nextPacket(); err != nil { - return nil, capture.PacketUnknown, 0, err + if err = s.nextPacket(); err != nil { + pktType = capture.PacketUnknown + return } - var ( - data capture.IPLayer - snapLen = s.curTPacketHeader.snapLen() - ) - // If a buffer was provided, et the correct length of the buffer and populate it - // Otherwise, allocate a new IPLayer + // If a buffer was provided, extend it to maximum capacity if pBuf != nil { - data = pBuf[:cap(pBuf)] - } else { - data = make(capture.IPLayer, snapLen) + ipLayer = pBuf[:cap(pBuf)] } - // Populate the packet - s.curTPacketHeader.payloadCopyPutAtOffset(data, uint32(s.ipLayerOffset), snapLen) - if ipLayerSnaplen := snapLen - uint32(s.ipLayerOffset); int(ipLayerSnaplen) < len(data) { - data = data[:ipLayerSnaplen] - } + // Populate the IP layer / buffer & parameters + ipLayer, pktType, pktLen = s.curTPacketHeader.payloadPut(ipLayer, s.ipLayerOffset) - return data, s.curTPacketHeader.packetType(), s.curTPacketHeader.pktLen(), nil + return } // NextIPPacketZeroCopy receives the IP layer of the next packet from the source and returns it. The operation is blocking. // The returned IPLayer provides direct zero-copy access to the underlying data source (e.g. a ring buffer). -func (s *Source) NextIPPacketZeroCopy() (capture.IPLayer, capture.PacketType, uint32, error) { +func (s *Source) NextIPPacketZeroCopy() (ipLayer capture.IPLayer, pktType capture.PacketType, pktLen uint32, err error) { - if err := s.nextPacket(); err != nil { - return nil, capture.PacketUnknown, 0, err + if err = s.nextPacket(); err != nil { + pktType = capture.PacketUnknown + return } - return s.curTPacketHeader.payloadNoCopyAtOffset(uint32(s.ipLayerOffset), s.curTPacketHeader.snapLen()), - s.curTPacketHeader.packetType(), - s.curTPacketHeader.pktLen(), - nil + // Extract the IP layer (zero-copy) & parameters + ipLayer, pktType, pktLen = s.curTPacketHeader.payloadZeroCopy(s.ipLayerOffset) + + return } // NextPacketFn executes the provided function on the next packet received on the source. If possible, the @@ -265,10 +237,10 @@ func (s *Source) NextPacketFn(fn func(payload []byte, totalLen uint32, pktType c return err } - return fn(s.curTPacketHeader.payloadNoCopyAtOffset(0, s.curTPacketHeader.snapLen()), - s.curTPacketHeader.pktLen(), - s.curTPacketHeader.packetType(), - s.ipLayerOffset) + // Extract the payload (zero-copy) & parameters + payload, pktType, pktLen := s.curTPacketHeader.payloadZeroCopy(0) + + return fn(payload, pktLen, pktType, s.ipLayerOffset) } // Stats returns (and clears) the packet counters of the underlying source diff --git a/capture/afpacket/afring/tpacket.go b/capture/afpacket/afring/tpacket.go index 8fb60db..13d067b 100644 --- a/capture/afpacket/afring/tpacket.go +++ b/capture/afpacket/afring/tpacket.go @@ -8,6 +8,7 @@ import ( "math" "unsafe" + "github.com/fako1024/slimcap/capture" "golang.org/x/sys/unix" ) @@ -86,14 +87,26 @@ func (t tPacketRequest) blockSizeNr() int { return int(t.blockSize * t.blockNr) } -// tPacketHeader denotes the V3 tpacket_hdr structure, c.f. -// https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_packet.h +// tPacketHeader denotes a wrapper around the raw data of a TPacket block structure type tPacketHeader struct { data []byte ppos uint32 nPktsLeft uint32 } +// tPacketHeaderV3 denotes the V3 tpacket_hdr structure, c.f. +// https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_packet.h +// Note: The struct parses only the relevant portions of the header, the rest is +// skipped / ignored by means of dummy elements of the correct in-memory size +type tPacketHeaderV3 struct { + snaplen uint32 // 12-16 + pktLen uint32 // 16-20 + _ uint32 // skip + pktPos uint32 // 24-28 + _ [15]uint16 // skip + pktType byte // 58 +} + func (t tPacketHeader) nPkts() uint32 { return *(*uint32)(unsafe.Pointer(&t.data[12])) // #nosec G103 } @@ -102,23 +115,10 @@ func (t tPacketHeader) offsetToFirstPkt() uint32 { return *(*uint32)(unsafe.Pointer(&t.data[16])) // #nosec G103 } -// 2 * 3 * uint32 for timestamps - -// / -> Packet Header func (t tPacketHeader) nextOffset() uint32 { return *(*uint32)(unsafe.Pointer(&t.data[t.ppos])) // #nosec G103 } -// 2 * uint32 for timestamps - -func (t tPacketHeader) snapLen() uint32 { - return *(*uint32)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 -} - -func (t tPacketHeader) pktLen() uint32 { - return *(*uint32)(unsafe.Pointer(&t.data[t.ppos+16])) // #nosec G103 -} - func (t tPacketHeader) pktLenPut(data []byte) { copy(data, t.data[t.ppos+16:t.ppos+20]) } @@ -127,16 +127,70 @@ func (t tPacketHeader) packetType() byte { return t.data[t.ppos+58] } -func (t tPacketHeader) payloadNoCopyAtOffset(offset, to uint32) []byte { - pos := t.ppos + uint32(*(*uint16)(unsafe.Pointer(&t.data[t.ppos+24]))) // #nosec G103 - return t.data[pos+offset : pos+to] +func (t tPacketHeader) payloadZeroCopy(offset byte) ([]byte, byte, uint32) { + + // Parse the V3 TPacketHeader and the first byte of the payload + hdr := (*tPacketHeaderV3)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 + pos := t.ppos + hdr.pktPos + uint32(offset) + + // Return the payload / IP layer subslice & heeader parameters + return t.data[pos : pos+hdr.snaplen], + hdr.pktType, + hdr.pktLen } -func (t tPacketHeader) payloadCopyPutAtOffset(data []byte, offset, to uint32) { - pos := t.ppos + uint32(*(*uint16)(unsafe.Pointer(&t.data[t.ppos+24]))) // #nosec G103 - copy(data, t.data[pos+offset:pos+to]) +func (t tPacketHeader) packetPut(data capture.Packet, ipLayerOffset byte) capture.Packet { + + // Parse the V3 TPacketHeader, the first byte of the payload and snaplen + hdr := (*tPacketHeaderV3)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 + pos := t.ppos + hdr.pktPos + snapLen := int(hdr.snaplen) + + // Allocate new capture.Packet if no buffer was provided + if data == nil { + data = make(capture.Packet, capture.PacketHdrOffset+snapLen) + } + + // Extract / copy all required data / header parameters + data[0] = hdr.pktType + data[1] = ipLayerOffset + t.pktLenPut(data[2:6]) + copy(data[6:], t.data[pos:pos+hdr.snaplen]) + + // Ensure correct packet length + if snapLen+capture.PacketHdrOffset < len(data) { + data = data[:capture.PacketHdrOffset+snapLen] + } + + return data } +func (t tPacketHeader) payloadPut(data []byte, offset byte) ([]byte, capture.PacketType, uint32) { + + // Parse the V3 TPacketHeader, the first byte of the payload and snaplen + hdr := (*tPacketHeaderV3)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 + pos := t.ppos + hdr.pktPos + uint32(offset) + snapLen := int(hdr.snaplen) + + // Allocate new payload / IP layer if no buffer was provided + if data == nil { + data = make([]byte, snapLen) + } + + // Copy payload / IP layer + copy(data, t.data[pos:pos+hdr.snaplen]) + + // Ensure correct data length + if effectiveSnapLen := snapLen - int(offset); effectiveSnapLen < len(data) { + data = data[:effectiveSnapLen] + } + + // Return payload / IP layer & header parameters + return data, hdr.pktType, hdr.pktLen +} + +////////////////////////////////////////////////////////////////////////////////////////////////// + func pageSizeAlign(x int) int { return int((uint(x) + pageSizeAlignment - 1) &^ (pageSizeAlignment - 1)) } @@ -180,6 +234,14 @@ func blockSizeTPacketAlign(x, blockSize int) (int, error) { // return *(*uint32)(unsafe.Pointer(&t.data[4])) // } +// func (t tPacketHeader) snapLen() uint32 { +// return *(*uint32)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 +// } + +// func (t tPacketHeader) pktLen() uint32 { +// return *(*uint32)(unsafe.Pointer(&t.data[t.ppos+16])) // #nosec G103 +// } + // func (t tPacketHeader) blockLen() uint32 { // return *(*uint32)(unsafe.Pointer(&t.data[20])) // } @@ -204,3 +266,13 @@ func blockSizeTPacketAlign(x, blockSize int) (int, error) { // func (t tPacketHeader) net() uint16 { // return *(*uint16)(unsafe.Pointer(&t.data[t.ppos+26])) // } + +// func (t tPacketHeader) payloadNoCopyAtOffset(offset, to uint32) []byte { +// pos := t.ppos + uint32(*(*uint16)(unsafe.Pointer(&t.data[t.ppos+24]))) // #nosec G103 +// return t.data[pos+offset : pos+to] +// } + +// func (t tPacketHeader) payloadCopyPutAtOffset(data []byte, offset, to uint32) { +// pos := t.ppos + uint32(*(*uint16)(unsafe.Pointer(&t.data[t.ppos+24]))) // #nosec G103 +// copy(data, t.data[pos+offset:pos+to]) +// } From e34c83fc7f554d18ce2844b8a778e7b20108a722 Mon Sep 17 00:00:00 2001 From: fako1024 Date: Mon, 11 Sep 2023 12:10:58 +0200 Subject: [PATCH 02/15] Fix incorrect type of packet position in buffer block --- capture/afpacket/afring/tpacket.go | 10 +++++----- examples/dump/main.go | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/capture/afpacket/afring/tpacket.go b/capture/afpacket/afring/tpacket.go index 13d067b..8679627 100644 --- a/capture/afpacket/afring/tpacket.go +++ b/capture/afpacket/afring/tpacket.go @@ -102,8 +102,8 @@ type tPacketHeaderV3 struct { snaplen uint32 // 12-16 pktLen uint32 // 16-20 _ uint32 // skip - pktPos uint32 // 24-28 - _ [15]uint16 // skip + pktPos uint16 // 24-26 + _ [16]uint16 // skip pktType byte // 58 } @@ -131,7 +131,7 @@ func (t tPacketHeader) payloadZeroCopy(offset byte) ([]byte, byte, uint32) { // Parse the V3 TPacketHeader and the first byte of the payload hdr := (*tPacketHeaderV3)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 - pos := t.ppos + hdr.pktPos + uint32(offset) + pos := t.ppos + uint32(hdr.pktPos) + uint32(offset) // Return the payload / IP layer subslice & heeader parameters return t.data[pos : pos+hdr.snaplen], @@ -143,7 +143,7 @@ func (t tPacketHeader) packetPut(data capture.Packet, ipLayerOffset byte) captur // Parse the V3 TPacketHeader, the first byte of the payload and snaplen hdr := (*tPacketHeaderV3)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 - pos := t.ppos + hdr.pktPos + pos := t.ppos + uint32(hdr.pktPos) snapLen := int(hdr.snaplen) // Allocate new capture.Packet if no buffer was provided @@ -169,7 +169,7 @@ func (t tPacketHeader) payloadPut(data []byte, offset byte) ([]byte, capture.Pac // Parse the V3 TPacketHeader, the first byte of the payload and snaplen hdr := (*tPacketHeaderV3)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 - pos := t.ppos + hdr.pktPos + uint32(offset) + pos := t.ppos + uint32(hdr.pktPos) + uint32(offset) snapLen := int(hdr.snaplen) // Allocate new payload / IP layer if no buffer was provided diff --git a/examples/dump/main.go b/examples/dump/main.go index 0131bd3..86cbb11 100644 --- a/examples/dump/main.go +++ b/examples/dump/main.go @@ -10,6 +10,7 @@ import ( "os" "github.com/els0r/telemetry/logging" + "github.com/fako1024/slimcap/capture" "github.com/fako1024/slimcap/capture/afpacket/afring" "github.com/fako1024/slimcap/link" ) @@ -65,7 +66,7 @@ func main() { logger.Infof("Reading %d packets from wire (zero-copy function call)...", maxPkts) for i := 0; i < maxPkts; i++ { if err := listener.NextPacketFn(func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte) (err error) { - logger.Infof("Received packet with Payload on `%s` (total len %d): %v (inbound: %v)", devName, totalLen, payload, p.IsInbound()) + logger.Infof("Received packet with Payload on `%s` (total len %d): %v (inbound: %v)", devName, totalLen, payload, pktType != capture.PacketOutgoing) return }); err != nil { logger.Fatalf("error during capture (zero-copy function call) on `%s`: %s", devName, err) From 3932deffbc970989e155060fe655343b31ed4b16 Mon Sep 17 00:00:00 2001 From: fako1024 Date: Mon, 11 Sep 2023 12:16:51 +0200 Subject: [PATCH 03/15] Use new struct approach for mock population --- capture/afpacket/afring/afring_mock.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/capture/afpacket/afring/afring_mock.go b/capture/afpacket/afring/afring_mock.go index 2da6270..74dfe4e 100644 --- a/capture/afpacket/afring/afring_mock.go +++ b/capture/afpacket/afring/afring_mock.go @@ -129,11 +129,13 @@ func (m *MockSource) addPacket(payload []byte, totalLen uint32, pktType, ipLayer block := m.ringBuffer.ring[thisBlock*m.blockSize : thisBlock*m.blockSize+m.blockSize] - *(*uint32)(unsafe.Pointer(&block[m.curBlockPos+12])) = uint32(m.snapLen) // #nosec: G103 // snapLen - *(*uint32)(unsafe.Pointer(&block[m.curBlockPos+16])) = totalLen // #nosec: G103 // totalLen - *(*uint32)(unsafe.Pointer(&block[m.curBlockPos+24])) = uint32(mac) // #nosec: G103 // mac - block[m.curBlockPos+58] = pktType // pktType - copy(block[m.curBlockPos+mac:m.curBlockPos+mac+m.snapLen], payload) // payload + *(*tPacketHeaderV3)(unsafe.Pointer(&block[m.curBlockPos+12])) = tPacketHeaderV3{ + snaplen: uint32(m.snapLen), + pktLen: totalLen, + pktPos: mac, + pktType: pktType, + } // #nosec: G103 + copy(block[m.curBlockPos+mac:m.curBlockPos+mac+m.snapLen], payload) // payload // Ensure that there is no "stray" nextOffset set from a previous perusal of this ring buffer block which // might remain in case the block is finalized From f2c78728124300a544479a910863c940ba8debc2 Mon Sep 17 00:00:00 2001 From: fako1024 Date: Mon, 11 Sep 2023 13:07:29 +0200 Subject: [PATCH 04/15] Remove redundant block packet counter and minimize type conversions for constant values --- capture/afpacket/afring/afring.go | 12 ++++++------ capture/afpacket/afring/afring_mock.go | 1 + capture/afpacket/afring/tpacket.go | 23 +++++++++++------------ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/capture/afpacket/afring/afring.go b/capture/afpacket/afring/afring.go index 616dc05..3c8d961 100644 --- a/capture/afpacket/afring/afring.go +++ b/capture/afpacket/afring/afring.go @@ -50,6 +50,8 @@ type Source struct { isPromisc bool link *link.Link + ipLayerOffsetNum uint32 + unblocked bool ringBuffer @@ -88,6 +90,7 @@ func NewSourceFromLink(link *link.Link, options ...Option) (*Source, error) { ipLayerOffset: link.Type.IPHeaderOffset(), link: link, } + src.ipLayerOffsetNum = uint32(src.ipLayerOffset) for _, opt := range options { opt(src) @@ -208,7 +211,7 @@ func (s *Source) NextIPPacket(pBuf capture.IPLayer) (ipLayer capture.IPLayer, pk } // Populate the IP layer / buffer & parameters - ipLayer, pktType, pktLen = s.curTPacketHeader.payloadPut(ipLayer, s.ipLayerOffset) + ipLayer, pktType, pktLen = s.curTPacketHeader.payloadPut(ipLayer, s.ipLayerOffsetNum) return } @@ -223,7 +226,7 @@ func (s *Source) NextIPPacketZeroCopy() (ipLayer capture.IPLayer, pktType captur } // Extract the IP layer (zero-copy) & parameters - ipLayer, pktType, pktLen = s.curTPacketHeader.payloadZeroCopy(s.ipLayerOffset) + ipLayer, pktType, pktLen = s.curTPacketHeader.payloadZeroCopy(s.ipLayerOffsetNum) return } @@ -351,12 +354,11 @@ fetch: // After fetching a new TPacketHeader, set the position of the first packet and the number of packets // in this TPacketHeader s.curTPacketHeader.ppos = s.curTPacketHeader.offsetToFirstPkt() - s.curTPacketHeader.nPktsLeft = s.curTPacketHeader.nPkts() } else { // If there is no next offset, release the TPacketHeader to the kernel and fetch a new one nextPos := s.curTPacketHeader.nextOffset() - if s.curTPacketHeader.nPktsLeft == 0 { + if nextPos == 0 { s.curTPacketHeader.setStatus(unix.TP_STATUS_KERNEL) s.offset = (s.offset + 1) % int(s.tpReq.blockNr) s.curTPacketHeader.data = nil @@ -367,8 +369,6 @@ fetch: s.curTPacketHeader.ppos += nextPos } - s.curTPacketHeader.nPktsLeft-- - // Apply filter (if any) if filter := s.link.FilterMask(); filter > 0 && filter&s.curTPacketHeader.packetType() != 0 { goto fetch diff --git a/capture/afpacket/afring/afring_mock.go b/capture/afpacket/afring/afring_mock.go index 74dfe4e..94cfc06 100644 --- a/capture/afpacket/afring/afring_mock.go +++ b/capture/afpacket/afring/afring_mock.go @@ -62,6 +62,7 @@ func NewMockSource(_ string, options ...Option) (*MockSource, error) { }, eventHandler: mockHandler, } + src.ipLayerOffsetNum = uint32(src.ipLayerOffset) for _, opt := range options { opt(src) diff --git a/capture/afpacket/afring/tpacket.go b/capture/afpacket/afring/tpacket.go index 8679627..76acf2e 100644 --- a/capture/afpacket/afring/tpacket.go +++ b/capture/afpacket/afring/tpacket.go @@ -89,9 +89,8 @@ func (t tPacketRequest) blockSizeNr() int { // tPacketHeader denotes a wrapper around the raw data of a TPacket block structure type tPacketHeader struct { - data []byte - ppos uint32 - nPktsLeft uint32 + data []byte + ppos uint32 } // tPacketHeaderV3 denotes the V3 tpacket_hdr structure, c.f. @@ -107,10 +106,6 @@ type tPacketHeaderV3 struct { pktType byte // 58 } -func (t tPacketHeader) nPkts() uint32 { - return *(*uint32)(unsafe.Pointer(&t.data[12])) // #nosec G103 -} - func (t tPacketHeader) offsetToFirstPkt() uint32 { return *(*uint32)(unsafe.Pointer(&t.data[16])) // #nosec G103 } @@ -127,11 +122,11 @@ func (t tPacketHeader) packetType() byte { return t.data[t.ppos+58] } -func (t tPacketHeader) payloadZeroCopy(offset byte) ([]byte, byte, uint32) { +func (t tPacketHeader) payloadZeroCopy(offset uint32) ([]byte, byte, uint32) { // Parse the V3 TPacketHeader and the first byte of the payload hdr := (*tPacketHeaderV3)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 - pos := t.ppos + uint32(hdr.pktPos) + uint32(offset) + pos := t.ppos + offset + uint32(hdr.pktPos) // Return the payload / IP layer subslice & heeader parameters return t.data[pos : pos+hdr.snaplen], @@ -152,9 +147,9 @@ func (t tPacketHeader) packetPut(data capture.Packet, ipLayerOffset byte) captur } // Extract / copy all required data / header parameters + t.pktLenPut(data[2:6]) data[0] = hdr.pktType data[1] = ipLayerOffset - t.pktLenPut(data[2:6]) copy(data[6:], t.data[pos:pos+hdr.snaplen]) // Ensure correct packet length @@ -165,11 +160,11 @@ func (t tPacketHeader) packetPut(data capture.Packet, ipLayerOffset byte) captur return data } -func (t tPacketHeader) payloadPut(data []byte, offset byte) ([]byte, capture.PacketType, uint32) { +func (t tPacketHeader) payloadPut(data []byte, offset uint32) ([]byte, capture.PacketType, uint32) { // Parse the V3 TPacketHeader, the first byte of the payload and snaplen hdr := (*tPacketHeaderV3)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 - pos := t.ppos + uint32(hdr.pktPos) + uint32(offset) + pos := t.ppos + offset + uint32(hdr.pktPos) snapLen := int(hdr.snaplen) // Allocate new payload / IP layer if no buffer was provided @@ -234,6 +229,10 @@ func blockSizeTPacketAlign(x, blockSize int) (int, error) { // return *(*uint32)(unsafe.Pointer(&t.data[4])) // } +// func (t tPacketHeader) nPkts() uint32 { +// return *(*uint32)(unsafe.Pointer(&t.data[12])) // #nosec G103 +// } + // func (t tPacketHeader) snapLen() uint32 { // return *(*uint32)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 // } From 196377e4c7f263b9d25883fed74569db9925e925 Mon Sep 17 00:00:00 2001 From: fako1024 Date: Tue, 12 Sep 2023 13:54:18 +0200 Subject: [PATCH 05/15] Further optimize PPOLL assembler and code inlining --- event/poll.go | 18 ++++-------------- event/poll_386.s | 11 ++++++----- event/poll_amd64.s | 11 ++++++----- event/poll_arm.s | 11 ++++++----- event/poll_arm64.s | 11 ++++++----- event/poll_asm.go | 13 +++++-------- event/poll_default.go | 6 ++++-- event/poll_mock.go | 22 +++++++++++++--------- event/poll_nomock.go | 11 +++++++++-- 9 files changed, 59 insertions(+), 55 deletions(-) diff --git a/event/poll.go b/event/poll.go index 9fcf9dd..b64b5eb 100644 --- a/event/poll.go +++ b/event/poll.go @@ -9,7 +9,10 @@ import ( "golang.org/x/sys/unix" ) -const nPollEvents = 2 +const ( + eventPollIn = unix.POLLIN + eventConnReset = unix.POLLHUP | unix.POLLERR +) ///////////////////////////////////////////////////////////////////////////////////////// @@ -29,16 +32,3 @@ func (p *Handler) recvfrom(buf []byte, flags int) (int, uint8, error) { return n, pktType, nil } - -func poll(pollEvents [nPollEvents]unix.PollFd) (bool, unix.Errno) { - errno := pollBlock(&pollEvents[0], nPollEvents) - if errno != 0 { - return pollEvents[0].Revents&unix.POLLIN != 0, errno - } - - if pollEvents[1].Revents&unix.POLLHUP != 0 || pollEvents[1].Revents&unix.POLLERR != 0 { - errno = unix.ECONNRESET - } - - return pollEvents[0].Revents&unix.POLLIN != 0, errno -} diff --git a/event/poll_386.s b/event/poll_386.s index 97fcb5d..36dd127 100644 --- a/event/poll_386.s +++ b/event/poll_386.s @@ -2,23 +2,24 @@ #define INVOKE_SYSCALL INT $0x80 #define SYS__PPOLL 0x135 +#define N_EVTS 0x02 -// func pollBlock(fds *unix.PollFd, nfds int) (err syscall.Errno) -TEXT ·pollBlock(SB),NOSPLIT,$0-12 +// func pollBlock(fds *unix.PollFd) (err syscall.Errno) +TEXT ·pollBlock(SB),NOSPLIT,$0-8 CALL runtime·entersyscallblock(SB) // Call blocking SYSCALL directive from runtime package MOVL $SYS__PPOLL, AX // Prepare / perform ppoll() SYSCALL MOVL fds+0(FP), BX // PollFDs parameter - MOVL nfds+4(FP), CX // Put nFDs parameter + MOVL $N_EVTS, CX // Put nFDs parameter (constant N_EVTS) MOVL $0x0, DX // Put timeout parameter (set to NULL) MOVL $0x0, SI // Put sigmask parameter (skip) INVOKE_SYSCALL CMPL AX, $0xfffff002 // No error / EINTR JLS success // Jump to success NEGL AX // Negate SYSCALL errno - MOVL AX, err+8(FP) // Store error code in err return value + MOVL AX, err+4(FP) // Store error code in err return value CALL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return success: - MOVL $0, err+8(FP) // Store NULL error code in err return value + MOVL $0, err+4(FP) // Store NULL error code in err return value CALL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return diff --git a/event/poll_amd64.s b/event/poll_amd64.s index b4a1dfe..670a63e 100644 --- a/event/poll_amd64.s +++ b/event/poll_amd64.s @@ -1,12 +1,13 @@ #include "textflag.h" #define SYS__PPOLL 0x10f +#define N_EVTS 0x02 -// func pollBlock(fds *unix.PollFd, nfds int) (err syscall.Errno) -TEXT ·pollBlock(SB),NOSPLIT,$0-24 +// func pollBlock(fds *unix.PollFd) (err syscall.Errno) +TEXT ·pollBlock(SB),NOSPLIT,$0-16 CALL runtime·entersyscallblock(SB) // Call blocking SYSCALL directive from runtime package MOVQ fds+0(FP), DI // PollFDs parameter - MOVQ nfds+8(FP), SI // Put nFDs parameter + MOVQ $N_EVTS, SI // Put nFDs parameter (constant N_EVTS) MOVQ $0x0, DX // Put timeout parameter (set to NULL) MOVQ $0x0, R10 // Put sigmask parameter (skip) MOVQ $SYS__PPOLL, AX // Prepare / perform ppoll() SYSCALL @@ -14,10 +15,10 @@ TEXT ·pollBlock(SB),NOSPLIT,$0-24 CMPQ AX, $0xfffffffffffff002 // No error / EINTR JLS success // Jump to success NEGQ AX // Negate SYSCALL errno - MOVQ AX, err+16(FP) // Store error code in err return value + MOVQ AX, err+8(FP) // Store error code in err return value CALL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return success: - MOVQ $0, err+16(FP) // Store NULL error code in err return value + MOVQ $0, err+8(FP) // Store NULL error code in err return value CALL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return diff --git a/event/poll_arm.s b/event/poll_arm.s index 38ed490..1ced0ed 100644 --- a/event/poll_arm.s +++ b/event/poll_arm.s @@ -1,24 +1,25 @@ #include "textflag.h" #define SYS__PPOLL 0x150 +#define N_EVTS 0x02 -// func pollBlock(fds *unix.PollFd, nfds int) (err syscall.Errno) -TEXT ·pollBlock(SB),NOSPLIT,$0-12 +// func pollBlock(fds *unix.PollFd) (err syscall.Errno) +TEXT ·pollBlock(SB),NOSPLIT,$0-8 BL runtime·entersyscallblock(SB) // Call blocking SYSCALL directive from runtime package MOVW $SYS__PPOLL, R7 // Prepare / perform ppoll() SYSCALL MOVW fds+0(FP), R0 // PollFDs parameter - MOVW nfds+4(FP), R1 // Put nFDs parameter + MOVW $N_EVTS, R1 // Put nFDs parameter (constant N_EVTS) MOVW $0x0, R2 // Put timeout parameter (set to NULL) MOVW $0x0, R3 // Put sigmask parameter (skip) SWI $0 CMP $0xfffff002, R0 // No error / EINTR BLS success // Jump to success RSB $0, R0, R0 // Negate SYSCALL errno - MOVW R0, err+8(FP) // Store error code in err return value + MOVW R0, err+4(FP) // Store error code in err return value BL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return success: MOVW $0, R0 - MOVW R0, err+8(FP) // Store NULL error code in err return value + MOVW R0, err+4(FP) // Store NULL error code in err return value BL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return diff --git a/event/poll_arm64.s b/event/poll_arm64.s index 1570825..2bc1757 100644 --- a/event/poll_arm64.s +++ b/event/poll_arm64.s @@ -1,12 +1,13 @@ #include "textflag.h" #define SYS__PPOLL 0x49 +#define N_EVTS 0x02 -// func pollBlock(fds *unix.PollFd, nfds int) (err syscall.Errno) -TEXT ·pollBlock(SB),NOSPLIT,$0-24 +// func pollBlock(fds *unix.PollFd) (err syscall.Errno) +TEXT ·pollBlock(SB),NOSPLIT,$0-16 BL runtime·entersyscallblock(SB) // Call blocking SYSCALL directive from runtime package MOVD fds+0(FP), R0 // PollFDs parameter - MOVD nfds+8(FP), R1 // Put nFDs parameter + MOVD $N_EVTS, R1 // Put nFDs parameter (constant N_EVTS) MOVD $0x0, R2 // Put timeout parameter (set to NULL) MOVD $0x0, R3 // Put sigmask parameter (skip) MOVD $SYS__PPOLL, R8 // Prepare / perform ppoll() SYSCALL @@ -14,10 +15,10 @@ TEXT ·pollBlock(SB),NOSPLIT,$0-24 CMP $0xfffffffffffff002, R0 // No error / EINTR BLS success // Jump to success NEG R0, R0 // Negate SYSCALL errno - MOVD R0, err+16(FP) // Store error code in err return value + MOVD R0, err+8(FP) // Store error code in err return value BL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return success: - MOVD $0, err+16(FP) // Store NULL error code in err return value + MOVD $0, err+8(FP) // Store NULL error code in err return value BL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return diff --git a/event/poll_asm.go b/event/poll_asm.go index 90596e6..b85c20d 100644 --- a/event/poll_asm.go +++ b/event/poll_asm.go @@ -4,25 +4,22 @@ package event import ( - "golang.org/x/sys/unix" - _ "unsafe" // required to support go:linkname + + "golang.org/x/sys/unix" ) //go:noescape //go:nosplit -func pollBlock(fds *unix.PollFd, nfds int) unix.Errno +//go:norace +func pollBlock(fds *unix.PollFd) unix.Errno //////////////////////////////////// -// The following stubs are required to allow unsafe access to their equivalent in the runtime package from assembly +// The following stub is required to allow unsafe access to their equivalent in the runtime package from assembly // This might break in the future, but there various sources that claim that it's widely used (and even issues at least // imply that it's ok to use in favor of exposing that functionality (c.f. https://github.com/golang/go/issues/29734) //go:linkname entersyscallblock runtime.entersyscallblock //go:noescape func entersyscallblock() //nolint:deadcode - -//go:linkname exitsyscall runtime.exitsyscall -//go:noescape -func exitsyscall() //nolint:deadcode diff --git a/event/poll_default.go b/event/poll_default.go index 8c8a1fd..82a15ba 100644 --- a/event/poll_default.go +++ b/event/poll_default.go @@ -9,11 +9,13 @@ import ( "golang.org/x/sys/unix" ) -func pollBlock(fds *unix.PollFd, nfds int) unix.Errno { +const nPollEvents = uintptr(0x02) + +func pollBlock(fds *unix.PollFd) unix.Errno { // #nosec: G103 _, _, e := unix.Syscall6(unix.SYS_PPOLL, uintptr(unsafe.Pointer(fds)), - uintptr(nfds), uintptr(unsafe.Pointer(nil)), 0, 0, 0) + nPollEvents, uintptr(unsafe.Pointer(nil)), 0, 0, 0) return e } diff --git a/event/poll_mock.go b/event/poll_mock.go index 0a3c052..76beafa 100644 --- a/event/poll_mock.go +++ b/event/poll_mock.go @@ -26,7 +26,7 @@ type Handler struct { // Poll polls (blocking, hence no timeout) for events on the file descriptor and the event // file descriptor (waiting for a POLLIN event). -func (p *Handler) Poll(events int16) (bool, unix.Errno) { +func (p *Handler) Poll(events int16) (hasEvent bool, errno unix.Errno) { pollEvents := [...]unix.PollFd{ { Fd: int32(p.Efd), @@ -38,21 +38,25 @@ func (p *Handler) Poll(events int16) (bool, unix.Errno) { }, } - // Fast path: If this is not a MockHandler, simply return a regular poll + // Perform blocking PPOLL + errno = pollBlock(&pollEvents[0]) + if errno == 0 && pollEvents[1].Revents&eventConnReset != 0 { + errno = unix.ECONNRESET + } + hasEvent = pollEvents[0].Revents&eventPollIn != 0 + + // Fast path: If this is not a MockHandler, simply return if p.mockFd == nil { - return poll(pollEvents) + return } - // MockHandler logic: Poll, then release the semaphore, indicating data has + // MockHandler logic: Release the semaphore, indicating data has // been consumed - hasEvent, errno := poll(pollEvents) if !hasEvent && errno == 0 { - if errno := p.mockFd.ReleaseSemaphore(); errno != 0 { - return false, errno - } + errno = p.mockFd.ReleaseSemaphore() } - return hasEvent, errno + return } // Recvfrom retrieves data directly from the socket diff --git a/event/poll_nomock.go b/event/poll_nomock.go index 0260268..fd60d96 100644 --- a/event/poll_nomock.go +++ b/event/poll_nomock.go @@ -21,7 +21,7 @@ type Handler struct { // Poll polls (blocking, hence no timeout) for events on the file descriptor and the event // file descriptor (waiting for a POLLIN event). -func (p *Handler) Poll(events int16) (bool, unix.Errno) { +func (p *Handler) Poll(events int16) (hasEvent bool, errno unix.Errno) { pollEvents := [...]unix.PollFd{ { Fd: int32(p.Efd), @@ -33,7 +33,14 @@ func (p *Handler) Poll(events int16) (bool, unix.Errno) { }, } - return poll(pollEvents) + // Perform blocking PPOLL + errno = pollBlock(&pollEvents[0]) + if errno == 0 && pollEvents[1].Revents&eventConnReset != 0 { + errno = unix.ECONNRESET + } + hasEvent = pollEvents[0].Revents&eventPollIn != 0 + + return } // Recvfrom retrieves data directly from the socket From f3b23d873dc87760073df1f57fe46c0ae76b4862 Mon Sep 17 00:00:00 2001 From: fako1024 Date: Tue, 12 Sep 2023 14:00:02 +0200 Subject: [PATCH 06/15] TRIVIAL: Use unix.XXX instead of syscall.XXX across package for consistency --- event/event_mock_test.go | 5 ++--- event/poll_386.s | 2 +- event/poll_amd64.s | 2 +- event/poll_arm.s | 2 +- event/poll_arm64.s | 2 +- examples/trace/trace.go | 4 ++-- link/interface_linux.go | 5 +++-- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/event/event_mock_test.go b/event/event_mock_test.go index 409b96f..156d2aa 100644 --- a/event/event_mock_test.go +++ b/event/event_mock_test.go @@ -5,7 +5,6 @@ package event import ( "errors" - "syscall" "testing" "time" @@ -57,13 +56,13 @@ func TestPollOnClosedFD(t *testing.T) { efdHasEvent, errno := handler.Poll(unix.POLLIN | unix.POLLERR) require.True(t, efdHasEvent) - require.Equal(t, syscall.Errno(0x0), errno) + require.Equal(t, unix.Errno(0x0), errno) _, err = handler.Efd.ReadEvent() require.Nil(t, err) for i := 0; i < 10; i++ { efdHasEvent, errno := handler.Poll(unix.POLLIN | unix.POLLERR) require.False(t, efdHasEvent) - require.Equal(t, syscall.Errno(0x9), errno) + require.Equal(t, unix.Errno(0x9), errno) } } diff --git a/event/poll_386.s b/event/poll_386.s index 36dd127..58896b7 100644 --- a/event/poll_386.s +++ b/event/poll_386.s @@ -4,7 +4,7 @@ #define SYS__PPOLL 0x135 #define N_EVTS 0x02 -// func pollBlock(fds *unix.PollFd) (err syscall.Errno) +// func pollBlock(fds *unix.PollFd) (err unix.Errno) TEXT ·pollBlock(SB),NOSPLIT,$0-8 CALL runtime·entersyscallblock(SB) // Call blocking SYSCALL directive from runtime package MOVL $SYS__PPOLL, AX // Prepare / perform ppoll() SYSCALL diff --git a/event/poll_amd64.s b/event/poll_amd64.s index 670a63e..c97d438 100644 --- a/event/poll_amd64.s +++ b/event/poll_amd64.s @@ -3,7 +3,7 @@ #define SYS__PPOLL 0x10f #define N_EVTS 0x02 -// func pollBlock(fds *unix.PollFd) (err syscall.Errno) +// func pollBlock(fds *unix.PollFd) (err unix.Errno) TEXT ·pollBlock(SB),NOSPLIT,$0-16 CALL runtime·entersyscallblock(SB) // Call blocking SYSCALL directive from runtime package MOVQ fds+0(FP), DI // PollFDs parameter diff --git a/event/poll_arm.s b/event/poll_arm.s index 1ced0ed..9a863ae 100644 --- a/event/poll_arm.s +++ b/event/poll_arm.s @@ -3,7 +3,7 @@ #define SYS__PPOLL 0x150 #define N_EVTS 0x02 -// func pollBlock(fds *unix.PollFd) (err syscall.Errno) +// func pollBlock(fds *unix.PollFd) (err unix.Errno) TEXT ·pollBlock(SB),NOSPLIT,$0-8 BL runtime·entersyscallblock(SB) // Call blocking SYSCALL directive from runtime package MOVW $SYS__PPOLL, R7 // Prepare / perform ppoll() SYSCALL diff --git a/event/poll_arm64.s b/event/poll_arm64.s index 2bc1757..b337b9b 100644 --- a/event/poll_arm64.s +++ b/event/poll_arm64.s @@ -3,7 +3,7 @@ #define SYS__PPOLL 0x49 #define N_EVTS 0x02 -// func pollBlock(fds *unix.PollFd) (err syscall.Errno) +// func pollBlock(fds *unix.PollFd) (err unix.Errno) TEXT ·pollBlock(SB),NOSPLIT,$0-16 BL runtime·entersyscallblock(SB) // Call blocking SYSCALL directive from runtime package MOVD fds+0(FP), R0 // PollFDs parameter diff --git a/examples/trace/trace.go b/examples/trace/trace.go index f16a0f7..a104f5b 100644 --- a/examples/trace/trace.go +++ b/examples/trace/trace.go @@ -9,12 +9,12 @@ import ( "sort" "strings" "sync" - "syscall" "github.com/fako1024/slimcap/capture" "github.com/fako1024/slimcap/capture/afpacket/afpacket" "github.com/fako1024/slimcap/capture/afpacket/afring" "github.com/fako1024/slimcap/link" + "golang.org/x/sys/unix" ) // Capture denotes a simple capturing structure / manager @@ -163,7 +163,7 @@ func (c *Capture) Run() (err error) { logger.Infof("attempting capture on interfaces [%s]", strings.Join(capturing, ",")) sigExitChan := make(chan os.Signal, 1) - signal.Notify(sigExitChan, syscall.SIGTERM, os.Interrupt) + signal.Notify(sigExitChan, unix.SIGTERM, os.Interrupt) var listeners []capture.Source // Fork a goroutine for each interface diff --git a/link/interface_linux.go b/link/interface_linux.go index 78fad8f..2ac09e4 100644 --- a/link/interface_linux.go +++ b/link/interface_linux.go @@ -10,7 +10,8 @@ import ( "os" "strconv" "strings" - "syscall" + + "golang.org/x/sys/unix" ) const ( @@ -60,7 +61,7 @@ func (i Interface) IsUp() (bool, error) { return false, err } - return flags&syscall.IFF_UP != 0, nil + return flags&unix.IFF_UP != 0, nil } //////////////////////////////////////////////////////////////////////////////// From c9955baa654b9173df99ec8b6f874ad869597115 Mon Sep 17 00:00:00 2001 From: fako1024 Date: Tue, 12 Sep 2023 14:01:47 +0200 Subject: [PATCH 07/15] Extend afring benchmarks to cover different ring buffer sizes --- capture/afpacket/afring/afring_mock_test.go | 205 ++++++++++---------- 1 file changed, 106 insertions(+), 99 deletions(-) diff --git a/capture/afpacket/afring/afring_mock_test.go b/capture/afpacket/afring/afring_mock_test.go index 1527c57..b6cfecc 100644 --- a/capture/afpacket/afring/afring_mock_test.go +++ b/capture/afpacket/afring/afring_mock_test.go @@ -385,114 +385,121 @@ func BenchmarkCaptureMethods(b *testing.B) { 6, []byte{1, 2}, 0, 128) require.Nil(b, err) - // Setup a mock source - mockSrc, err := NewMockSourceNoDrain("mock", - CaptureLength(link.CaptureLengthMinimalIPv4Transport), - Promiscuous(false), - ) - require.Nil(b, err) - - for mockSrc.CanAddPackets() { - require.Nil(b, mockSrc.AddPacket(testPacket)) - } - _, err = mockSrc.Run(time.Microsecond) - require.Nil(b, err) - - b.Run("NextPacket", func(b *testing.B) { - b.ReportAllocs() - for i := 0; i < b.N; i++ { - p, _ := mockSrc.NextPacket(nil) - _ = p - } - }) - - b.Run("NextPacketInPlace", func(b *testing.B) { - var p capture.Packet = mockSrc.NewPacket() - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - p, _ = mockSrc.NextPacket(p) - _ = p - } - }) - - b.Run("NextPayload", func(b *testing.B) { - b.ReportAllocs() - for i := 0; i < b.N; i++ { - p, pktType, totalLen, _ := mockSrc.NextPayload(nil) - _ = p - _ = pktType - _ = totalLen + for _, bufSize := range [][2]int{ + {10 * 1024, 512}, + {10 * 1024 * 1024, 4}, + } { + + // Setup a mock source + mockSrc, err := NewMockSourceNoDrain("mock", + CaptureLength(link.CaptureLengthMinimalIPv4Transport), + BufferSize(bufSize[0], bufSize[1]), + Promiscuous(false), + ) + require.Nil(b, err) + + for mockSrc.CanAddPackets() { + require.Nil(b, mockSrc.AddPacket(testPacket)) } - }) + _, err = mockSrc.Run(time.Microsecond) + require.Nil(b, err) + + b.Run(fmt.Sprintf("NextPacket_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + p, _ := mockSrc.NextPacket(nil) + _ = p + } + }) - b.Run("NextPayloadInPlace", func(b *testing.B) { - pkt := mockSrc.NewPacket() - var p []byte = pkt.Payload() - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - p, pktType, totalLen, _ := mockSrc.NextPayload(p) - _ = p - _ = pktType - _ = totalLen - } - }) + b.Run(fmt.Sprintf("NextPacketInPlace_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + var p capture.Packet = mockSrc.NewPacket() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + p, _ = mockSrc.NextPacket(p) + _ = p + } + }) - b.Run("NextPayloadZeroCopy", func(b *testing.B) { - b.ReportAllocs() - for i := 0; i < b.N; i++ { - p, pktType, totalLen, _ := mockSrc.NextPayloadZeroCopy() - _ = p - _ = pktType - _ = totalLen - } - }) + b.Run(fmt.Sprintf("NextPayload_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + p, pktType, totalLen, _ := mockSrc.NextPayload(nil) + _ = p + _ = pktType + _ = totalLen + } + }) - b.Run("NextIPPacket", func(b *testing.B) { - b.ReportAllocs() - for i := 0; i < b.N; i++ { - p, pktType, totalLen, _ := mockSrc.NextIPPacket(nil) - _ = p - _ = pktType - _ = totalLen - } - }) + b.Run(fmt.Sprintf("NextPayloadInPlace_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + pkt := mockSrc.NewPacket() + var p []byte = pkt.Payload() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + p, pktType, totalLen, _ := mockSrc.NextPayload(p) + _ = p + _ = pktType + _ = totalLen + } + }) - b.Run("NextIPPacketInPlace", func(b *testing.B) { - pkt := mockSrc.NewPacket() - var p capture.IPLayer = pkt.IPLayer() - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - p, pktType, totalLen, _ := mockSrc.NextIPPacket(p) - _ = p - _ = pktType - _ = totalLen - } - }) + b.Run(fmt.Sprintf("NextPayloadZeroCopy_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + p, pktType, totalLen, _ := mockSrc.NextPayloadZeroCopy() + _ = p + _ = pktType + _ = totalLen + } + }) - b.Run("NextIPPacketZeroCopy", func(b *testing.B) { - b.ReportAllocs() - for i := 0; i < b.N; i++ { - p, pktType, totalLen, _ := mockSrc.NextIPPacketZeroCopy() - _ = p - _ = pktType - _ = totalLen - } - }) + b.Run(fmt.Sprintf("NextIPPacket_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + p, pktType, totalLen, _ := mockSrc.NextIPPacket(nil) + _ = p + _ = pktType + _ = totalLen + } + }) - b.Run("NextPacketFn", func(b *testing.B) { - b.ReportAllocs() - for i := 0; i < b.N; i++ { - _ = mockSrc.NextPacketFn(func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte) error { - _ = payload + b.Run(fmt.Sprintf("NextIPPacketInPlace_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + pkt := mockSrc.NewPacket() + var p capture.IPLayer = pkt.IPLayer() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + p, pktType, totalLen, _ := mockSrc.NextIPPacket(p) + _ = p + _ = pktType _ = totalLen + } + }) + + b.Run(fmt.Sprintf("NextIPPacketZeroCopy_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + p, pktType, totalLen, _ := mockSrc.NextIPPacketZeroCopy() + _ = p _ = pktType - return nil - }) - } - }) + _ = totalLen + } + }) + + b.Run(fmt.Sprintf("NextPacketFn_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _ = mockSrc.NextPacketFn(func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte) error { + _ = payload + _ = totalLen + _ = pktType + return nil + }) + } + }) + } } func testCaptureMethods(t *testing.T, fn func(t *testing.T, _ *MockSource, _, _ uint16)) { From d46c7b23d06658b2dafe3c617cda19c4bd6f8ad4 Mon Sep 17 00:00:00 2001 From: fako1024 Date: Wed, 13 Sep 2023 16:20:56 +0200 Subject: [PATCH 08/15] Provide slimcap_noasm build tag to disable assembler optimizations on demand --- event/poll_386.s | 2 ++ event/poll_amd64.s | 2 ++ event/poll_arm.s | 2 ++ event/poll_arm64.s | 2 ++ event/poll_asm.go | 3 ++- event/poll_default.go | 4 ++-- 6 files changed, 12 insertions(+), 3 deletions(-) diff --git a/event/poll_386.s b/event/poll_386.s index 58896b7..803c4fe 100644 --- a/event/poll_386.s +++ b/event/poll_386.s @@ -1,3 +1,5 @@ +// +build !slimcap_noasm + #include "textflag.h" #define INVOKE_SYSCALL INT $0x80 diff --git a/event/poll_amd64.s b/event/poll_amd64.s index c97d438..0738512 100644 --- a/event/poll_amd64.s +++ b/event/poll_amd64.s @@ -1,3 +1,5 @@ +// +build !slimcap_noasm + #include "textflag.h" #define SYS__PPOLL 0x10f diff --git a/event/poll_arm.s b/event/poll_arm.s index 9a863ae..2761d12 100644 --- a/event/poll_arm.s +++ b/event/poll_arm.s @@ -1,3 +1,5 @@ +// +build !slimcap_noasm + #include "textflag.h" #define SYS__PPOLL 0x150 diff --git a/event/poll_arm64.s b/event/poll_arm64.s index b337b9b..a65021b 100644 --- a/event/poll_arm64.s +++ b/event/poll_arm64.s @@ -1,3 +1,5 @@ +// +build !slimcap_noasm + #include "textflag.h" #define SYS__PPOLL 0x49 diff --git a/event/poll_asm.go b/event/poll_asm.go index b85c20d..9c349ae 100644 --- a/event/poll_asm.go +++ b/event/poll_asm.go @@ -1,5 +1,6 @@ -//go:build (linux && amd64) || (linux && arm64) || (linux && arm) || (linux && 386) +//go:build ((linux && amd64) || (linux && arm64) || (linux && arm) || (linux && 386)) && !slimcap_noasm // +build linux,amd64 linux,arm64 linux,arm linux,386 +// +build !slimcap_noasm package event diff --git a/event/poll_default.go b/event/poll_default.go index 82a15ba..0d1ef36 100644 --- a/event/poll_default.go +++ b/event/poll_default.go @@ -1,5 +1,5 @@ -//go:build linux && !amd64 && !arm64 && !arm && !386 -// +build linux,!amd64,!arm64,!arm,!386 +//go:build (linux && !amd64 && !arm64 && !arm && !386) || slimcap_noasm +// +build linux,!amd64,!arm64,!arm,!386 slimcap_noasm package event From 1dfc64df857535aae9adcb0ca49714276451703b Mon Sep 17 00:00:00 2001 From: fako1024 Date: Thu, 14 Sep 2023 10:10:24 +0200 Subject: [PATCH 09/15] Reduce general benchmark overhead from scheduler by locking population goroutine to OS thread --- capture/afpacket/afring/afring_mock_nodrain.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/capture/afpacket/afring/afring_mock_nodrain.go b/capture/afpacket/afring/afring_mock_nodrain.go index 706d7a5..ae9007f 100644 --- a/capture/afpacket/afring/afring_mock_nodrain.go +++ b/capture/afpacket/afring/afring_mock_nodrain.go @@ -5,6 +5,7 @@ package afring import ( "errors" + "runtime" "sync" "sync/atomic" "time" @@ -68,6 +69,8 @@ func (m *MockSourceNoDrain) Run(releaseInterval time.Duration) (<-chan error, er m.wgRunning.Add(1) go func(errs chan error) { + // Minimize scheduler overhead by locking this goroutine to the current thread + runtime.LockOSThread() defer func() { close(errs) m.wgRunning.Done() From 5237d8db0d172667657e29d33acd9b3ac98fe102 Mon Sep 17 00:00:00 2001 From: fako1024 Date: Thu, 14 Sep 2023 11:00:32 +0200 Subject: [PATCH 10/15] Ensure mock sources are cleaned up properly during benchmarks --- capture/afpacket/afring/afring_mock_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/capture/afpacket/afring/afring_mock_test.go b/capture/afpacket/afring/afring_mock_test.go index b6cfecc..9db720f 100644 --- a/capture/afpacket/afring/afring_mock_test.go +++ b/capture/afpacket/afring/afring_mock_test.go @@ -499,6 +499,8 @@ func BenchmarkCaptureMethods(b *testing.B) { }) } }) + + require.Nil(b, mockSrc.Close()) } } From 1259acafb9af9b937f4a31d04e07929b77f9d7c0 Mon Sep 17 00:00:00 2001 From: fako1024 Date: Fri, 15 Sep 2023 14:35:15 +0200 Subject: [PATCH 11/15] Improve mock test / benchmark type consistency and benchmark parameters --- capture/afpacket/afring/afring_mock.go | 15 ++++++++-- capture/afpacket/afring/afring_mock_test.go | 32 ++++++++++++--------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/capture/afpacket/afring/afring_mock.go b/capture/afpacket/afring/afring_mock.go index 94cfc06..49e3e2b 100644 --- a/capture/afpacket/afring/afring_mock.go +++ b/capture/afpacket/afring/afring_mock.go @@ -22,6 +22,16 @@ const ( blockStatusPollInterval = 10 * time.Millisecond ) +type tPacketHeaderV3Mock struct { + snaplen uint32 // 12-16 + pktLen uint32 // 16-20 + _ uint32 // skip + pktMac uint16 // 24-26 + pktNet uint16 // 26-28 + _ [15]uint16 // skip + pktType byte // 58 +} + // MockSource denotes a fully mocked ring buffer source, behaving just like one // Since it wraps a regular Source, it can be used as a stand-in replacement without any further // code modifications: @@ -130,10 +140,11 @@ func (m *MockSource) addPacket(payload []byte, totalLen uint32, pktType, ipLayer block := m.ringBuffer.ring[thisBlock*m.blockSize : thisBlock*m.blockSize+m.blockSize] - *(*tPacketHeaderV3)(unsafe.Pointer(&block[m.curBlockPos+12])) = tPacketHeaderV3{ + *(*tPacketHeaderV3Mock)(unsafe.Pointer(&block[m.curBlockPos+12])) = tPacketHeaderV3Mock{ snaplen: uint32(m.snapLen), pktLen: totalLen, - pktPos: mac, + pktMac: mac, + pktNet: mac + uint16(m.ipLayerOffset), pktType: pktType, } // #nosec: G103 copy(block[m.curBlockPos+mac:m.curBlockPos+mac+m.snapLen], payload) // payload diff --git a/capture/afpacket/afring/afring_mock_test.go b/capture/afpacket/afring/afring_mock_test.go index 9db720f..b06aa15 100644 --- a/capture/afpacket/afring/afring_mock_test.go +++ b/capture/afpacket/afring/afring_mock_test.go @@ -385,15 +385,19 @@ func BenchmarkCaptureMethods(b *testing.B) { 6, []byte{1, 2}, 0, 128) require.Nil(b, err) - for _, bufSize := range [][2]int{ - {10 * 1024, 512}, - {10 * 1024 * 1024, 4}, + for _, benchConfig := range []struct { + blockSize int + nBlocks int + blockExpiry time.Duration + }{ + {10 * 1024 * 1024, 4, time.Microsecond}, + {10 * 1024, 512, 10 * time.Nanosecond}, } { // Setup a mock source mockSrc, err := NewMockSourceNoDrain("mock", CaptureLength(link.CaptureLengthMinimalIPv4Transport), - BufferSize(bufSize[0], bufSize[1]), + BufferSize(benchConfig.blockSize, benchConfig.nBlocks), Promiscuous(false), ) require.Nil(b, err) @@ -401,10 +405,10 @@ func BenchmarkCaptureMethods(b *testing.B) { for mockSrc.CanAddPackets() { require.Nil(b, mockSrc.AddPacket(testPacket)) } - _, err = mockSrc.Run(time.Microsecond) + _, err = mockSrc.Run(benchConfig.blockExpiry) require.Nil(b, err) - b.Run(fmt.Sprintf("NextPacket_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.Run(fmt.Sprintf("NextPacket_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { p, _ := mockSrc.NextPacket(nil) @@ -412,7 +416,7 @@ func BenchmarkCaptureMethods(b *testing.B) { } }) - b.Run(fmt.Sprintf("NextPacketInPlace_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.Run(fmt.Sprintf("NextPacketInPlace_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { var p capture.Packet = mockSrc.NewPacket() b.ReportAllocs() b.ResetTimer() @@ -422,7 +426,7 @@ func BenchmarkCaptureMethods(b *testing.B) { } }) - b.Run(fmt.Sprintf("NextPayload_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.Run(fmt.Sprintf("NextPayload_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { p, pktType, totalLen, _ := mockSrc.NextPayload(nil) @@ -432,7 +436,7 @@ func BenchmarkCaptureMethods(b *testing.B) { } }) - b.Run(fmt.Sprintf("NextPayloadInPlace_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.Run(fmt.Sprintf("NextPayloadInPlace_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { pkt := mockSrc.NewPacket() var p []byte = pkt.Payload() b.ReportAllocs() @@ -445,7 +449,7 @@ func BenchmarkCaptureMethods(b *testing.B) { } }) - b.Run(fmt.Sprintf("NextPayloadZeroCopy_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.Run(fmt.Sprintf("NextPayloadZeroCopy_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { p, pktType, totalLen, _ := mockSrc.NextPayloadZeroCopy() @@ -455,7 +459,7 @@ func BenchmarkCaptureMethods(b *testing.B) { } }) - b.Run(fmt.Sprintf("NextIPPacket_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.Run(fmt.Sprintf("NextIPPacket_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { p, pktType, totalLen, _ := mockSrc.NextIPPacket(nil) @@ -465,7 +469,7 @@ func BenchmarkCaptureMethods(b *testing.B) { } }) - b.Run(fmt.Sprintf("NextIPPacketInPlace_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.Run(fmt.Sprintf("NextIPPacketInPlace_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { pkt := mockSrc.NewPacket() var p capture.IPLayer = pkt.IPLayer() b.ReportAllocs() @@ -478,7 +482,7 @@ func BenchmarkCaptureMethods(b *testing.B) { } }) - b.Run(fmt.Sprintf("NextIPPacketZeroCopy_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.Run(fmt.Sprintf("NextIPPacketZeroCopy_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { p, pktType, totalLen, _ := mockSrc.NextIPPacketZeroCopy() @@ -488,7 +492,7 @@ func BenchmarkCaptureMethods(b *testing.B) { } }) - b.Run(fmt.Sprintf("NextPacketFn_%dkiBx%d", bufSize[0]/1000, bufSize[1]), func(b *testing.B) { + b.Run(fmt.Sprintf("NextPacketFn_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { _ = mockSrc.NextPacketFn(func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte) error { From eeaf39e0981f804a3080db5e07e43fbd8f7b57cf Mon Sep 17 00:00:00 2001 From: fako1024 Date: Mon, 18 Sep 2023 09:36:32 +0200 Subject: [PATCH 12/15] Refactor individual packet retrieval methods for optimal inlining --- capture/afpacket/afring/afring.go | 234 ++++++++++++--------- capture/afpacket/afring/afring_zerocopy.go | 171 +++++++++++++++ capture/afpacket/afring/ring.go | 33 +++ capture/afpacket/afring/tpacket.go | 80 +------ 4 files changed, 345 insertions(+), 173 deletions(-) create mode 100644 capture/afpacket/afring/afring_zerocopy.go create mode 100644 capture/afpacket/afring/ring.go diff --git a/capture/afpacket/afring/afring.go b/capture/afpacket/afring/afring.go index 3c8d961..b9092e8 100644 --- a/capture/afpacket/afring/afring.go +++ b/capture/afpacket/afring/afring.go @@ -28,18 +28,6 @@ const ( DefaultSnapLen = (1 << 16) // DefaultSnapLen : 64 kiB ) -type ringBuffer struct { - ring []byte - - tpReq tPacketRequest - curTPacketHeader *tPacketHeader - offset int -} - -func (b *ringBuffer) nextTPacketHeader() { - b.curTPacketHeader.data = b.ring[b.offset*int(b.tpReq.blockSize):] -} - // Source denotes an AF_PACKET capture source making use of a ring buffer type Source struct { eventHandler *event.Handler @@ -51,8 +39,7 @@ type Source struct { link *link.Link ipLayerOffsetNum uint32 - - unblocked bool + filter byte ringBuffer sync.Mutex @@ -89,6 +76,7 @@ func NewSourceFromLink(link *link.Link, options ...Option) (*Source, error) { nBlocks: tPacketDefaultBlockNr, ipLayerOffset: link.Type.IPHeaderOffset(), link: link, + filter: link.FilterMask(), } src.ipLayerOffsetNum = uint32(src.ipLayerOffset) @@ -148,13 +136,32 @@ func (s *Source) NextPacket(pBuf capture.Packet) (pkt capture.Packet, err error) return } + pktHdr := s.curTPacketHeader + + // Parse the V3 TPacketHeader, the first byte of the payload and snaplen + hdr := pktHdr.parseHeader() + pos := pktHdr.ppos + uint32(hdr.pktMac) + effectiveSnapLen := capture.PacketHdrOffset + int(hdr.snaplen) + // If a buffer was provided, extend it to maximum capacity - if pBuf != nil { + if pBuf == nil { + + // Allocate new capture.Packet if no buffer was provided + pkt = make(capture.Packet, effectiveSnapLen) + } else { pkt = pBuf[:cap(pBuf)] } - // Populate the packet / buffer - pkt = s.curTPacketHeader.packetPut(pkt, s.ipLayerOffset) + // Extract / copy all required data / header parameters + pktHdr.pktLenCopy(pkt[2:capture.PacketHdrOffset]) + pkt[0] = pktHdr.data[pktHdr.ppos+58] + pkt[1] = s.ipLayerOffset + copy(pkt[capture.PacketHdrOffset:], pktHdr.data[pos:pos+hdr.snaplen]) + + // Ensure correct packet length + if effectiveSnapLen < len(pkt) { + pkt = pkt[:effectiveSnapLen] + } return } @@ -169,28 +176,32 @@ func (s *Source) NextPayload(pBuf []byte) (payload []byte, pktType capture.Packe return } + pktHdr := s.curTPacketHeader + + // Parse the V3 TPacketHeader, the first byte of the payload and snaplen + hdr := pktHdr.parseHeader() + pos := pktHdr.ppos + uint32(hdr.pktMac) + snapLen := int(hdr.snaplen) + // If a buffer was provided, extend it to maximum capacity if pBuf != nil { payload = pBuf[:cap(pBuf)] - } - - // Populate the payload / buffer & parameters - payload, pktType, pktLen = s.curTPacketHeader.payloadPut(payload, 0) + } else { - return -} + // Allocate new capture.Packet if no buffer was provided + payload = make([]byte, snapLen) + } -// NextPayloadZeroCopy receives the raw payload of the next packet from the source and returns it. The operation is blocking. -// The returned payload provides direct zero-copy access to the underlying data source (e.g. a ring buffer). -func (s *Source) NextPayloadZeroCopy() (payload []byte, pktType capture.PacketType, pktLen uint32, err error) { + // Copy payload / IP layer + copy(payload, pktHdr.data[pos:pos+hdr.snaplen]) - if err = s.nextPacket(); err != nil { - pktType = capture.PacketUnknown - return + // Ensure correct data length + if snapLen < len(payload) { + payload = payload[:snapLen] } - // Extract the payload (zero-copy) & parameters - payload, pktType, pktLen = s.curTPacketHeader.payloadZeroCopy(0) + // Populate the payload / buffer & parameters + pktType, pktLen = pktHdr.data[pktHdr.ppos+58], hdr.pktLen return } @@ -205,28 +216,38 @@ func (s *Source) NextIPPacket(pBuf capture.IPLayer) (ipLayer capture.IPLayer, pk return } + pktHdr := s.curTPacketHeader + + // Parse the V3 TPacketHeader and the first byte of the payload + hdr := pktHdr.parseHeader() + pos := pktHdr.ppos + uint32(hdr.pktNet) + + // Adjust effective snaplen (subtracting any potential mac layer) + effectiveSnapLen := hdr.snaplen + if s.ipLayerOffsetNum > 0 { + effectiveSnapLen -= s.ipLayerOffsetNum + } + snapLen := int(effectiveSnapLen) + // If a buffer was provided, extend it to maximum capacity if pBuf != nil { ipLayer = pBuf[:cap(pBuf)] - } - - // Populate the IP layer / buffer & parameters - ipLayer, pktType, pktLen = s.curTPacketHeader.payloadPut(ipLayer, s.ipLayerOffsetNum) + } else { - return -} + // Allocate new capture.Packet if no buffer was provided + ipLayer = make([]byte, snapLen) + } -// NextIPPacketZeroCopy receives the IP layer of the next packet from the source and returns it. The operation is blocking. -// The returned IPLayer provides direct zero-copy access to the underlying data source (e.g. a ring buffer). -func (s *Source) NextIPPacketZeroCopy() (ipLayer capture.IPLayer, pktType capture.PacketType, pktLen uint32, err error) { + // Copy payload / IP layer + copy(ipLayer, pktHdr.data[pos:pos+effectiveSnapLen]) - if err = s.nextPacket(); err != nil { - pktType = capture.PacketUnknown - return + // Ensure correct data length + if snapLen < len(ipLayer) { + ipLayer = ipLayer[:snapLen] } - // Extract the IP layer (zero-copy) & parameters - ipLayer, pktType, pktLen = s.curTPacketHeader.payloadZeroCopy(s.ipLayerOffsetNum) + // Populate the payload / buffer & parameters + pktType, pktLen = pktHdr.data[pktHdr.ppos+58], hdr.pktLen return } @@ -240,10 +261,17 @@ func (s *Source) NextPacketFn(fn func(payload []byte, totalLen uint32, pktType c return err } - // Extract the payload (zero-copy) & parameters - payload, pktType, pktLen := s.curTPacketHeader.payloadZeroCopy(0) + pktHdr := s.curTPacketHeader - return fn(payload, pktLen, pktType, s.ipLayerOffset) + // Parse the V3 TPacketHeader and the first byte of the payload + hdr := pktHdr.parseHeader() + pos := pktHdr.ppos + uint32(hdr.pktMac) + + // #nosec G103 + return fn(pktHdr.data[pos:pos+*(*uint32)(unsafe.Pointer(&pktHdr.data[pktHdr.ppos+12]))], + *(*uint32)(unsafe.Pointer(&pktHdr.data[pktHdr.ppos+16])), + pktHdr.data[pktHdr.ppos+58], + s.ipLayerOffset) } // Stats returns (and clears) the packet counters of the underlying source @@ -304,74 +332,70 @@ func (s *Source) Link() *link.Link { return s.link } +// nextPacket provides access to the next packet from either the current block or advances to the next +// one (fetching its first packet). func (s *Source) nextPacket() error { - // If the current TPacketHeader does not contain any more packets (or is uninitialized) - // fetch a new one from the ring buffer -fetch: - if s.curTPacketHeader.data == nil || s.unblocked { - if !s.unblocked { - s.nextTPacketHeader() +retry: + pktHdr := s.curTPacketHeader + + // If there is an active block, attempt to simply consume a packet from it + if pktHdr.data != nil { + + // If there are more packets remaining (i.e. there is a non-zero next offset), advance + // the current position. + // According to https://github.com/torvalds/linux/blame/master/net/packet/af_packet.c#L811 the + // tp_next_offset field is guaranteed to be zero for the final packet of the block. In addition, + // it cannot be zero otherwise (because that would be an invalid block). + if nextPos := pktHdr.nextOffset(); nextPos != 0 { + + // Update position of next packet and jump to the end + pktHdr.ppos += nextPos + goto finalize } - for s.curTPacketHeader.getStatus()&unix.TP_STATUS_USER == 0 || s.unblocked { - // Unset the bypass marker - if s.unblocked { - s.unblocked = false - } + // If there is no next offset, release the TPacketHeader to the kernel and move on to the next block + s.releaseAndAdvance() + } - // Run a PPOLL on the file descriptor, fetching a new block into the ring buffer - efdHasEvent, errno := s.eventHandler.Poll(unix.POLLIN | unix.POLLERR) + // Load the data for the block + s.loadTPacketHeader() - // If an event was received, ensure that the respective error is returned - // immediately (setting the `unblocked` marker to bypass checks done before - // upon next entry into this method) - if efdHasEvent { - return s.handleEvent() - } + // Check if the block is free to access in userland + for pktHdr.getStatus()&unix.TP_STATUS_USER == 0 { - // Handle errors - if errno != 0 { - if errno == unix.EINTR { - continue - } - if errno == unix.EBADF { - return capture.ErrCaptureStopped - } - return fmt.Errorf("error polling for next packet: %w (errno %d)", errno, int(errno)) - } + // Run a PPOLL on the file descriptor (waiting for the block to become available) + efdHasEvent, errno := s.eventHandler.Poll(unix.POLLIN | unix.POLLERR) - // Handle rare cases of runaway packets - if s.curTPacketHeader.getStatus()&unix.TP_STATUS_COPY != 0 { - s.curTPacketHeader.setStatus(unix.TP_STATUS_KERNEL) - s.offset = (s.offset + 1) % int(s.tpReq.blockNr) - s.nextTPacketHeader() + // If an event was received, ensure that the respective error / code is returned + // immediately + if efdHasEvent { + return s.handleEvent() + } + // Handle potential PPOLL errors + if errno != 0 { + if errno == unix.EINTR { continue } + return handlePollError(errno) } - // After fetching a new TPacketHeader, set the position of the first packet and the number of packets - // in this TPacketHeader - s.curTPacketHeader.ppos = s.curTPacketHeader.offsetToFirstPkt() - } else { - - // If there is no next offset, release the TPacketHeader to the kernel and fetch a new one - nextPos := s.curTPacketHeader.nextOffset() - if nextPos == 0 { - s.curTPacketHeader.setStatus(unix.TP_STATUS_KERNEL) - s.offset = (s.offset + 1) % int(s.tpReq.blockNr) - s.curTPacketHeader.data = nil - goto fetch + // Handle rare cases of runaway packets (this call will advance to the next block + // as a side effect in case of a detection) + if s.hasRunawayBlock() { + continue } - - // Update position of next packet - s.curTPacketHeader.ppos += nextPos } + // Set the position of the first packet in this block and jump to end + pktHdr.ppos = pktHdr.offsetToFirstPkt() + +finalize: + // Apply filter (if any) - if filter := s.link.FilterMask(); filter > 0 && filter&s.curTPacketHeader.packetType() != 0 { - goto fetch + if s.filter > 0 && s.filter&pktHdr.data[pktHdr.ppos+58] != 0 { + goto retry } return nil @@ -385,9 +409,10 @@ func (s *Source) handleEvent() error { return fmt.Errorf("error reading event: %w", err) } - // Set the bypass marker to allow for re-entry in nextPacket() where we left off if + // Unset the current block data to allow for re-entry in nextPacket[ZeroCopy]() where we left off if // required (e.g. on ErrCaptureUnblock) - s.unblocked = true + s.curTPacketHeader.data = nil + if efdData[7] > 0 { return capture.ErrCaptureStopped } @@ -424,3 +449,10 @@ func setupRingBuffer(sd socket.FileDescriptor, tPacketReq tPacketRequest) ([]byt return buf, eventFD, nil } + +func handlePollError(errno unix.Errno) error { + if errno == unix.EBADF { + return capture.ErrCaptureStopped + } + return fmt.Errorf("error polling for next packet: %w (errno %d)", errno, int(errno)) +} diff --git a/capture/afpacket/afring/afring_zerocopy.go b/capture/afpacket/afring/afring_zerocopy.go new file mode 100644 index 0000000..7a5c6c9 --- /dev/null +++ b/capture/afpacket/afring/afring_zerocopy.go @@ -0,0 +1,171 @@ +package afring + +import ( + "unsafe" + + "github.com/fako1024/slimcap/capture" + "golang.org/x/sys/unix" +) + +// NextPayloadZeroCopy receives the raw payload of the next packet from the source and returns it. The operation is blocking. +// The returned payload provides direct zero-copy access to the underlying data source (e.g. a ring buffer). +// Procedurally, the method extracts the next packet from either the current block or advances to the next +// one (fetching / returning its first packet). Using the offset parameter it supports extraction of +// both the full payload or the IP layer only. +func (s *Source) NextPayloadZeroCopy() (payload []byte, pktType capture.PacketType, pktLen uint32, err error) { + +retry: + pktHdr := s.curTPacketHeader + + // If there is an active block, attempt to simply consume a packet from it + if pktHdr.data != nil { + + // If there are more packets remaining (i.e. there is a non-zero next offset), advance + // the current position. + // According to https://github.com/torvalds/linux/blame/master/net/packet/af_packet.c#L811 the + // tp_next_offset field is guaranteed to be zero for the final packet of the block. In addition, + // it cannot be zero otherwise (because that would be an invalid block). + if nextPos := pktHdr.nextOffset(); nextPos != 0 { + + // Update position of next packet and jump to the end + pktHdr.ppos += nextPos + goto finalize + } + + // If there is no next offset, release the TPacketHeader to the kernel and move on to the next block + s.releaseAndAdvance() + } + + // Load the data for the block + s.loadTPacketHeader() + + // Check if the block is free to access in userland + for pktHdr.getStatus()&unix.TP_STATUS_USER == 0 { + + // Run a PPOLL on the file descriptor (waiting for the block to become available) + efdHasEvent, errno := s.eventHandler.Poll(unix.POLLIN | unix.POLLERR) + + // If an event was received, ensure that the respective error / code is returned + // immediately + if efdHasEvent { + pktType, err = capture.PacketUnknown, s.handleEvent() + return + } + + // Handle potential PPOLL errors + if errno != 0 { + if errno == unix.EINTR { + continue + } + pktType, err = capture.PacketUnknown, handlePollError(errno) + return + } + + // Handle rare cases of runaway packets (this call will advance to the next block + // as a side effect in case of a detection) + if s.hasRunawayBlock() { + continue + } + } + + // Set the position of the first packet in this block and jump to end + pktHdr.ppos = pktHdr.offsetToFirstPkt() + +finalize: + + // Apply filter (if any) + if s.filter > 0 && s.filter&pktHdr.data[pktHdr.ppos+58] != 0 { + goto retry + } + + // Parse the V3 TPacketHeader and the first byte of the payload + hdr := pktHdr.parseHeader() + pos := pktHdr.ppos + uint32(hdr.pktMac) + + // Return the payload / IP layer subslice & heeader parameters + return unsafe.Slice(&pktHdr.data[pos], hdr.snaplen), + pktHdr.data[pktHdr.ppos+58], + hdr.pktLen, nil + +} + +// NextIPPacketZeroCopy receives the IP layer of the next packet from the source and returns it. The operation is blocking. +// The returned IPLayer provides direct zero-copy access to the underlying data source (e.g. a ring buffer). +// Procedurally, the method extracts the next packet from either the current block or advances to the next +// one (fetching / returning its first packet). Using the offset parameter it supports extraction of +// both the full payload or the IP layer only. +func (s *Source) NextIPPacketZeroCopy() (ipLayer []byte, pktType capture.PacketType, pktLen uint32, err error) { + +retry: + pktHdr := s.curTPacketHeader + + // If there is an active block, attempt to simply consume a packet from it + if pktHdr.data != nil { + + // If there are more packets remaining (i.e. there is a non-zero next offset), advance + // the current position. + // According to https://github.com/torvalds/linux/blame/master/net/packet/af_packet.c#L811 the + // tp_next_offset field is guaranteed to be zero for the final packet of the block. In addition, + // it cannot be zero otherwise (because that would be an invalid block). + if nextPos := pktHdr.nextOffset(); nextPos != 0 { + + // Update position of next packet and jump to the end + pktHdr.ppos += nextPos + goto finalize + } + + // If there is no next offset, release the TPacketHeader to the kernel and move on to the next block + s.releaseAndAdvance() + } + + // Load the data for the block + s.loadTPacketHeader() + + // Check if the block is free to access in userland + for pktHdr.getStatus()&unix.TP_STATUS_USER == 0 { + + // Run a PPOLL on the file descriptor (waiting for the block to become available) + efdHasEvent, errno := s.eventHandler.Poll(unix.POLLIN | unix.POLLERR) + + // If an event was received, ensure that the respective error / code is returned + // immediately + if efdHasEvent { + pktType, err = capture.PacketUnknown, s.handleEvent() + return + } + + // Handle potential PPOLL errors + if errno != 0 { + if errno == unix.EINTR { + continue + } + pktType, err = capture.PacketUnknown, handlePollError(errno) + return + } + + // Handle rare cases of runaway packets (this call will advance to the next block + // as a side effect in case of a detection) + if s.hasRunawayBlock() { + continue + } + } + + // Set the position of the first packet in this block and jump to end + pktHdr.ppos = pktHdr.offsetToFirstPkt() + +finalize: + + // Apply filter (if any) + if s.filter > 0 && s.filter&pktHdr.data[pktHdr.ppos+58] != 0 { + goto retry + } + + // Parse the V3 TPacketHeader and the first byte of the payload + hdr := pktHdr.parseHeader() + pos := pktHdr.ppos + uint32(hdr.pktNet) + + // Extract the payload (zero-copy) & parameters + return unsafe.Slice(&pktHdr.data[pos], hdr.snaplen-s.ipLayerOffsetNum), + pktHdr.data[pktHdr.ppos+58], + hdr.pktLen, nil +} diff --git a/capture/afpacket/afring/ring.go b/capture/afpacket/afring/ring.go new file mode 100644 index 0000000..6b94e8f --- /dev/null +++ b/capture/afpacket/afring/ring.go @@ -0,0 +1,33 @@ +//go:build linux +// +build linux + +package afring + +import "golang.org/x/sys/unix" + +type ringBuffer struct { + ring []byte + + tpReq tPacketRequest + curTPacketHeader *tPacketHeader + offset int +} + +func (b *ringBuffer) releaseAndAdvance() { + b.curTPacketHeader.setStatus(unix.TP_STATUS_KERNEL) + b.offset = (b.offset + 1) % int(b.tpReq.blockNr) +} + +func (b *ringBuffer) loadTPacketHeader() { + b.curTPacketHeader.data = b.ring[b.offset*int(b.tpReq.blockSize):] +} + +func (b *ringBuffer) hasRunawayBlock() bool { + if b.curTPacketHeader.getStatus()&unix.TP_STATUS_COPY != 0 { + b.releaseAndAdvance() + b.curTPacketHeader.data = nil + return true + } + + return false +} diff --git a/capture/afpacket/afring/tpacket.go b/capture/afpacket/afring/tpacket.go index 76acf2e..af82f88 100644 --- a/capture/afpacket/afring/tpacket.go +++ b/capture/afpacket/afring/tpacket.go @@ -8,7 +8,6 @@ import ( "math" "unsafe" - "github.com/fako1024/slimcap/capture" "golang.org/x/sys/unix" ) @@ -98,12 +97,11 @@ type tPacketHeader struct { // Note: The struct parses only the relevant portions of the header, the rest is // skipped / ignored by means of dummy elements of the correct in-memory size type tPacketHeaderV3 struct { - snaplen uint32 // 12-16 - pktLen uint32 // 16-20 - _ uint32 // skip - pktPos uint16 // 24-26 - _ [16]uint16 // skip - pktType byte // 58 + snaplen uint32 // 12-16 + pktLen uint32 // 16-20 + _ uint32 // skip + pktMac uint16 // 24-26 + pktNet uint16 // 26-28 } func (t tPacketHeader) offsetToFirstPkt() uint32 { @@ -114,74 +112,12 @@ func (t tPacketHeader) nextOffset() uint32 { return *(*uint32)(unsafe.Pointer(&t.data[t.ppos])) // #nosec G103 } -func (t tPacketHeader) pktLenPut(data []byte) { +func (t tPacketHeader) pktLenCopy(data []byte) { copy(data, t.data[t.ppos+16:t.ppos+20]) } -func (t tPacketHeader) packetType() byte { - return t.data[t.ppos+58] -} - -func (t tPacketHeader) payloadZeroCopy(offset uint32) ([]byte, byte, uint32) { - - // Parse the V3 TPacketHeader and the first byte of the payload - hdr := (*tPacketHeaderV3)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 - pos := t.ppos + offset + uint32(hdr.pktPos) - - // Return the payload / IP layer subslice & heeader parameters - return t.data[pos : pos+hdr.snaplen], - hdr.pktType, - hdr.pktLen -} - -func (t tPacketHeader) packetPut(data capture.Packet, ipLayerOffset byte) capture.Packet { - - // Parse the V3 TPacketHeader, the first byte of the payload and snaplen - hdr := (*tPacketHeaderV3)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 - pos := t.ppos + uint32(hdr.pktPos) - snapLen := int(hdr.snaplen) - - // Allocate new capture.Packet if no buffer was provided - if data == nil { - data = make(capture.Packet, capture.PacketHdrOffset+snapLen) - } - - // Extract / copy all required data / header parameters - t.pktLenPut(data[2:6]) - data[0] = hdr.pktType - data[1] = ipLayerOffset - copy(data[6:], t.data[pos:pos+hdr.snaplen]) - - // Ensure correct packet length - if snapLen+capture.PacketHdrOffset < len(data) { - data = data[:capture.PacketHdrOffset+snapLen] - } - - return data -} - -func (t tPacketHeader) payloadPut(data []byte, offset uint32) ([]byte, capture.PacketType, uint32) { - - // Parse the V3 TPacketHeader, the first byte of the payload and snaplen - hdr := (*tPacketHeaderV3)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 - pos := t.ppos + offset + uint32(hdr.pktPos) - snapLen := int(hdr.snaplen) - - // Allocate new payload / IP layer if no buffer was provided - if data == nil { - data = make([]byte, snapLen) - } - - // Copy payload / IP layer - copy(data, t.data[pos:pos+hdr.snaplen]) - - // Ensure correct data length - if effectiveSnapLen := snapLen - int(offset); effectiveSnapLen < len(data) { - data = data[:effectiveSnapLen] - } - - // Return payload / IP layer & header parameters - return data, hdr.pktType, hdr.pktLen +func (t tPacketHeader) parseHeader() *tPacketHeaderV3 { + return (*tPacketHeaderV3)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 } ////////////////////////////////////////////////////////////////////////////////////////////////// From c8c01c4a859f99caef4eadfdfce25f4972c9dc7a Mon Sep 17 00:00:00 2001 From: fako1024 Date: Mon, 18 Sep 2023 09:45:28 +0200 Subject: [PATCH 13/15] Use unified ZeroCopy calls for NextPacketFn --- capture/afpacket/afring/afring.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/capture/afpacket/afring/afring.go b/capture/afpacket/afring/afring.go index b9092e8..1f0d910 100644 --- a/capture/afpacket/afring/afring.go +++ b/capture/afpacket/afring/afring.go @@ -268,8 +268,8 @@ func (s *Source) NextPacketFn(fn func(payload []byte, totalLen uint32, pktType c pos := pktHdr.ppos + uint32(hdr.pktMac) // #nosec G103 - return fn(pktHdr.data[pos:pos+*(*uint32)(unsafe.Pointer(&pktHdr.data[pktHdr.ppos+12]))], - *(*uint32)(unsafe.Pointer(&pktHdr.data[pktHdr.ppos+16])), + return fn(unsafe.Slice(&pktHdr.data[pos], hdr.snaplen), + hdr.pktLen, pktHdr.data[pktHdr.ppos+58], s.ipLayerOffset) } From 71fe06b707b5e9b5b47b0ba1c6262ff3f7151f24 Mon Sep 17 00:00:00 2001 From: fako1024 Date: Mon, 18 Sep 2023 10:37:44 +0200 Subject: [PATCH 14/15] Fix incorrect return value of NextIPPacketZeroCopy violating interface --- capture/afpacket/afring/afring_zerocopy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capture/afpacket/afring/afring_zerocopy.go b/capture/afpacket/afring/afring_zerocopy.go index 7a5c6c9..365ee7d 100644 --- a/capture/afpacket/afring/afring_zerocopy.go +++ b/capture/afpacket/afring/afring_zerocopy.go @@ -94,7 +94,7 @@ finalize: // Procedurally, the method extracts the next packet from either the current block or advances to the next // one (fetching / returning its first packet). Using the offset parameter it supports extraction of // both the full payload or the IP layer only. -func (s *Source) NextIPPacketZeroCopy() (ipLayer []byte, pktType capture.PacketType, pktLen uint32, err error) { +func (s *Source) NextIPPacketZeroCopy() (ipLayer capture.IPLayer, pktType capture.PacketType, pktLen uint32, err error) { retry: pktHdr := s.curTPacketHeader From 8ad026e2983ce0e80193de2b0bef1b031d0b82ed Mon Sep 17 00:00:00 2001 From: fako1024 Date: Mon, 25 Sep 2023 10:29:11 +0200 Subject: [PATCH 15/15] Fix wording of zero-copy function comment --- capture/afpacket/afring/afring_zerocopy.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/capture/afpacket/afring/afring_zerocopy.go b/capture/afpacket/afring/afring_zerocopy.go index 365ee7d..fe7506f 100644 --- a/capture/afpacket/afring/afring_zerocopy.go +++ b/capture/afpacket/afring/afring_zerocopy.go @@ -10,8 +10,7 @@ import ( // NextPayloadZeroCopy receives the raw payload of the next packet from the source and returns it. The operation is blocking. // The returned payload provides direct zero-copy access to the underlying data source (e.g. a ring buffer). // Procedurally, the method extracts the next packet from either the current block or advances to the next -// one (fetching / returning its first packet). Using the offset parameter it supports extraction of -// both the full payload or the IP layer only. +// one (fetching / returning its first packet). func (s *Source) NextPayloadZeroCopy() (payload []byte, pktType capture.PacketType, pktLen uint32, err error) { retry: @@ -92,8 +91,7 @@ finalize: // NextIPPacketZeroCopy receives the IP layer of the next packet from the source and returns it. The operation is blocking. // The returned IPLayer provides direct zero-copy access to the underlying data source (e.g. a ring buffer). // Procedurally, the method extracts the next packet from either the current block or advances to the next -// one (fetching / returning its first packet). Using the offset parameter it supports extraction of -// both the full payload or the IP layer only. +// one (fetching / returning its first packet IP layer). func (s *Source) NextIPPacketZeroCopy() (ipLayer capture.IPLayer, pktType capture.PacketType, pktLen uint32, err error) { retry: