diff --git a/capture/afpacket/afring/afring.go b/capture/afpacket/afring/afring.go index 0413b7b..1f0d910 100644 --- a/capture/afpacket/afring/afring.go +++ b/capture/afpacket/afring/afring.go @@ -28,18 +28,6 @@ const ( DefaultSnapLen = (1 << 16) // DefaultSnapLen : 64 kiB ) -type ringBuffer struct { - ring []byte - - tpReq tPacketRequest - curTPacketHeader *tPacketHeader - offset int -} - -func (b *ringBuffer) nextTPacketHeader() { - b.curTPacketHeader.data = b.ring[b.offset*int(b.tpReq.blockSize):] -} - // Source denotes an AF_PACKET capture source making use of a ring buffer type Source struct { eventHandler *event.Handler @@ -50,7 +38,8 @@ type Source struct { isPromisc bool link *link.Link - unblocked bool + ipLayerOffsetNum uint32 + filter byte ringBuffer sync.Mutex @@ -87,7 +76,9 @@ func NewSourceFromLink(link *link.Link, options ...Option) (*Source, error) { nBlocks: tPacketDefaultBlockNr, ipLayerOffset: link.Type.IPHeaderOffset(), link: link, + filter: link.FilterMask(), } + src.ipLayerOffsetNum = uint32(src.ipLayerOffset) for _, opt := range options { opt(src) @@ -139,121 +130,126 @@ func (s *Source) NewPacket() capture.Packet { // NextPacket receives the next packet from the source and returns it. The operation is blocking. In // case a non-nil "buffer" Packet is provided it will be populated with the data (and returned). The // buffer packet can be reused. Otherwise a new Packet is allocated. -func (s *Source) NextPacket(pBuf capture.Packet) (capture.Packet, error) { +func (s *Source) NextPacket(pBuf capture.Packet) (pkt capture.Packet, err error) { - if err := s.nextPacket(); err != nil { - return nil, err + if err = s.nextPacket(); err != nil { + return } - var ( - data capture.Packet - snapLen = int(s.curTPacketHeader.snapLen()) - ) - // If a buffer was provided, et the correct length of the buffer and populate it - // Otherwise, allocate a new Packet - if pBuf != nil { - data = pBuf[:cap(pBuf)] + pktHdr := s.curTPacketHeader + + // Parse the V3 TPacketHeader, the first byte of the payload and snaplen + hdr := pktHdr.parseHeader() + pos := pktHdr.ppos + uint32(hdr.pktMac) + effectiveSnapLen := capture.PacketHdrOffset + int(hdr.snaplen) + + // If a buffer was provided, extend it to maximum capacity + if pBuf == nil { + + // Allocate new capture.Packet if no buffer was provided + pkt = make(capture.Packet, effectiveSnapLen) } else { - data = make(capture.Packet, capture.PacketHdrOffset+snapLen) + pkt = pBuf[:cap(pBuf)] } - // Populate the packet - data[0] = s.curTPacketHeader.packetType() - data[1] = s.ipLayerOffset - s.curTPacketHeader.pktLenPut(data[2:6]) - s.curTPacketHeader.payloadCopyPutAtOffset(data[6:], 0, uint32(snapLen)) - if snapLen+capture.PacketHdrOffset < len(data) { - data = data[:capture.PacketHdrOffset+snapLen] + // Extract / copy all required data / header parameters + pktHdr.pktLenCopy(pkt[2:capture.PacketHdrOffset]) + pkt[0] = pktHdr.data[pktHdr.ppos+58] + pkt[1] = s.ipLayerOffset + copy(pkt[capture.PacketHdrOffset:], pktHdr.data[pos:pos+hdr.snaplen]) + + // Ensure correct packet length + if effectiveSnapLen < len(pkt) { + pkt = pkt[:effectiveSnapLen] } - return data, nil + return } // NextPayload receives the raw payload of the next packet from the source and returns it. The operation is blocking. // In case a non-nil "buffer" byte slice / payload is provided it will be populated with the data (and returned). // The buffer can be reused. Otherwise a new byte slice / payload is allocated. -func (s *Source) NextPayload(pBuf []byte) ([]byte, capture.PacketType, uint32, error) { +func (s *Source) NextPayload(pBuf []byte) (payload []byte, pktType capture.PacketType, pktLen uint32, err error) { - if err := s.nextPacket(); err != nil { - return nil, capture.PacketUnknown, 0, err + if err = s.nextPacket(); err != nil { + pktType = capture.PacketUnknown + return } - var ( - data []byte - snapLen = s.curTPacketHeader.snapLen() - ) - // If a buffer was provided, et the correct length of the buffer and populate it - // Otherwise, allocate a new byte slice - then populate the packet + pktHdr := s.curTPacketHeader + + // Parse the V3 TPacketHeader, the first byte of the payload and snaplen + hdr := pktHdr.parseHeader() + pos := pktHdr.ppos + uint32(hdr.pktMac) + snapLen := int(hdr.snaplen) + + // If a buffer was provided, extend it to maximum capacity if pBuf != nil { - data = s.curTPacketHeader.payloadNoCopyAtOffset(0, snapLen) + payload = pBuf[:cap(pBuf)] } else { - data = make([]byte, snapLen) - s.curTPacketHeader.payloadCopyPutAtOffset(data, 0, snapLen) - } - if int(snapLen) < len(data) { - data = data[:snapLen] + // Allocate new capture.Packet if no buffer was provided + payload = make([]byte, snapLen) } - return data, s.curTPacketHeader.packetType(), s.curTPacketHeader.pktLen(), nil -} - -// NextPayloadZeroCopy receives the raw payload of the next packet from the source and returns it. The operation is blocking. -// The returned payload provides direct zero-copy access to the underlying data source (e.g. a ring buffer). -func (s *Source) NextPayloadZeroCopy() ([]byte, capture.PacketType, uint32, error) { + // Copy payload / IP layer + copy(payload, pktHdr.data[pos:pos+hdr.snaplen]) - if err := s.nextPacket(); err != nil { - return nil, capture.PacketUnknown, 0, err + // Ensure correct data length + if snapLen < len(payload) { + payload = payload[:snapLen] } - return s.curTPacketHeader.payloadNoCopyAtOffset(0, s.curTPacketHeader.snapLen()), - s.curTPacketHeader.packetType(), - s.curTPacketHeader.pktLen(), - nil + // Populate the payload / buffer & parameters + pktType, pktLen = pktHdr.data[pktHdr.ppos+58], hdr.pktLen + + return } // NextIPPacket receives the IP layer of the next packet from the source and returns it. The operation is blocking. // In case a non-nil "buffer" IPLayer is provided it will be populated with the data (and returned). // The buffer can be reused. Otherwise a new IPLayer is allocated. -func (s *Source) NextIPPacket(pBuf capture.IPLayer) (capture.IPLayer, capture.PacketType, uint32, error) { +func (s *Source) NextIPPacket(pBuf capture.IPLayer) (ipLayer capture.IPLayer, pktType capture.PacketType, pktLen uint32, err error) { - if err := s.nextPacket(); err != nil { - return nil, capture.PacketUnknown, 0, err + if err = s.nextPacket(); err != nil { + pktType = capture.PacketUnknown + return + } + + pktHdr := s.curTPacketHeader + + // Parse the V3 TPacketHeader and the first byte of the payload + hdr := pktHdr.parseHeader() + pos := pktHdr.ppos + uint32(hdr.pktNet) + + // Adjust effective snaplen (subtracting any potential mac layer) + effectiveSnapLen := hdr.snaplen + if s.ipLayerOffsetNum > 0 { + effectiveSnapLen -= s.ipLayerOffsetNum } - var ( - data capture.IPLayer - snapLen = s.curTPacketHeader.snapLen() - ) + snapLen := int(effectiveSnapLen) - // If a buffer was provided, et the correct length of the buffer and populate it - // Otherwise, allocate a new IPLayer + // If a buffer was provided, extend it to maximum capacity if pBuf != nil { - data = pBuf[:cap(pBuf)] + ipLayer = pBuf[:cap(pBuf)] } else { - data = make(capture.IPLayer, snapLen) - } - // Populate the packet - s.curTPacketHeader.payloadCopyPutAtOffset(data, uint32(s.ipLayerOffset), snapLen) - if ipLayerSnaplen := snapLen - uint32(s.ipLayerOffset); int(ipLayerSnaplen) < len(data) { - data = data[:ipLayerSnaplen] + // Allocate new capture.Packet if no buffer was provided + ipLayer = make([]byte, snapLen) } - return data, s.curTPacketHeader.packetType(), s.curTPacketHeader.pktLen(), nil -} - -// NextIPPacketZeroCopy receives the IP layer of the next packet from the source and returns it. The operation is blocking. -// The returned IPLayer provides direct zero-copy access to the underlying data source (e.g. a ring buffer). -func (s *Source) NextIPPacketZeroCopy() (capture.IPLayer, capture.PacketType, uint32, error) { + // Copy payload / IP layer + copy(ipLayer, pktHdr.data[pos:pos+effectiveSnapLen]) - if err := s.nextPacket(); err != nil { - return nil, capture.PacketUnknown, 0, err + // Ensure correct data length + if snapLen < len(ipLayer) { + ipLayer = ipLayer[:snapLen] } - return s.curTPacketHeader.payloadNoCopyAtOffset(uint32(s.ipLayerOffset), s.curTPacketHeader.snapLen()), - s.curTPacketHeader.packetType(), - s.curTPacketHeader.pktLen(), - nil + // Populate the payload / buffer & parameters + pktType, pktLen = pktHdr.data[pktHdr.ppos+58], hdr.pktLen + + return } // NextPacketFn executes the provided function on the next packet received on the source. If possible, the @@ -265,9 +261,16 @@ func (s *Source) NextPacketFn(fn func(payload []byte, totalLen uint32, pktType c return err } - return fn(s.curTPacketHeader.payloadNoCopyAtOffset(0, s.curTPacketHeader.snapLen()), - s.curTPacketHeader.pktLen(), - s.curTPacketHeader.packetType(), + pktHdr := s.curTPacketHeader + + // Parse the V3 TPacketHeader and the first byte of the payload + hdr := pktHdr.parseHeader() + pos := pktHdr.ppos + uint32(hdr.pktMac) + + // #nosec G103 + return fn(unsafe.Slice(&pktHdr.data[pos], hdr.snaplen), + hdr.pktLen, + pktHdr.data[pktHdr.ppos+58], s.ipLayerOffset) } @@ -329,77 +332,70 @@ func (s *Source) Link() *link.Link { return s.link } +// nextPacket provides access to the next packet from either the current block or advances to the next +// one (fetching its first packet). func (s *Source) nextPacket() error { - // If the current TPacketHeader does not contain any more packets (or is uninitialized) - // fetch a new one from the ring buffer -fetch: - if s.curTPacketHeader.data == nil || s.unblocked { - if !s.unblocked { - s.nextTPacketHeader() +retry: + pktHdr := s.curTPacketHeader + + // If there is an active block, attempt to simply consume a packet from it + if pktHdr.data != nil { + + // If there are more packets remaining (i.e. there is a non-zero next offset), advance + // the current position. + // According to https://github.com/torvalds/linux/blame/master/net/packet/af_packet.c#L811 the + // tp_next_offset field is guaranteed to be zero for the final packet of the block. In addition, + // it cannot be zero otherwise (because that would be an invalid block). + if nextPos := pktHdr.nextOffset(); nextPos != 0 { + + // Update position of next packet and jump to the end + pktHdr.ppos += nextPos + goto finalize } - for s.curTPacketHeader.getStatus()&unix.TP_STATUS_USER == 0 || s.unblocked { - // Unset the bypass marker - if s.unblocked { - s.unblocked = false - } + // If there is no next offset, release the TPacketHeader to the kernel and move on to the next block + s.releaseAndAdvance() + } - // Run a PPOLL on the file descriptor, fetching a new block into the ring buffer - efdHasEvent, errno := s.eventHandler.Poll(unix.POLLIN | unix.POLLERR) + // Load the data for the block + s.loadTPacketHeader() - // If an event was received, ensure that the respective error is returned - // immediately (setting the `unblocked` marker to bypass checks done before - // upon next entry into this method) - if efdHasEvent { - return s.handleEvent() - } + // Check if the block is free to access in userland + for pktHdr.getStatus()&unix.TP_STATUS_USER == 0 { - // Handle errors - if errno != 0 { - if errno == unix.EINTR { - continue - } - if errno == unix.EBADF { - return capture.ErrCaptureStopped - } - return fmt.Errorf("error polling for next packet: %w (errno %d)", errno, int(errno)) - } + // Run a PPOLL on the file descriptor (waiting for the block to become available) + efdHasEvent, errno := s.eventHandler.Poll(unix.POLLIN | unix.POLLERR) - // Handle rare cases of runaway packets - if s.curTPacketHeader.getStatus()&unix.TP_STATUS_COPY != 0 { - s.curTPacketHeader.setStatus(unix.TP_STATUS_KERNEL) - s.offset = (s.offset + 1) % int(s.tpReq.blockNr) - s.nextTPacketHeader() + // If an event was received, ensure that the respective error / code is returned + // immediately + if efdHasEvent { + return s.handleEvent() + } + // Handle potential PPOLL errors + if errno != 0 { + if errno == unix.EINTR { continue } + return handlePollError(errno) } - // After fetching a new TPacketHeader, set the position of the first packet and the number of packets - // in this TPacketHeader - s.curTPacketHeader.ppos = s.curTPacketHeader.offsetToFirstPkt() - s.curTPacketHeader.nPktsLeft = s.curTPacketHeader.nPkts() - } else { - - // If there is no next offset, release the TPacketHeader to the kernel and fetch a new one - nextPos := s.curTPacketHeader.nextOffset() - if s.curTPacketHeader.nPktsLeft == 0 { - s.curTPacketHeader.setStatus(unix.TP_STATUS_KERNEL) - s.offset = (s.offset + 1) % int(s.tpReq.blockNr) - s.curTPacketHeader.data = nil - goto fetch + // Handle rare cases of runaway packets (this call will advance to the next block + // as a side effect in case of a detection) + if s.hasRunawayBlock() { + continue } - - // Update position of next packet - s.curTPacketHeader.ppos += nextPos } - s.curTPacketHeader.nPktsLeft-- + // Set the position of the first packet in this block and jump to end + pktHdr.ppos = pktHdr.offsetToFirstPkt() + +finalize: // Apply filter (if any) - if filter := s.link.FilterMask(); filter > 0 && filter&s.curTPacketHeader.packetType() != 0 { - goto fetch + if s.filter > 0 && s.filter&pktHdr.data[pktHdr.ppos+58] != 0 { + goto retry } return nil @@ -413,9 +409,10 @@ func (s *Source) handleEvent() error { return fmt.Errorf("error reading event: %w", err) } - // Set the bypass marker to allow for re-entry in nextPacket() where we left off if + // Unset the current block data to allow for re-entry in nextPacket[ZeroCopy]() where we left off if // required (e.g. on ErrCaptureUnblock) - s.unblocked = true + s.curTPacketHeader.data = nil + if efdData[7] > 0 { return capture.ErrCaptureStopped } @@ -452,3 +449,10 @@ func setupRingBuffer(sd socket.FileDescriptor, tPacketReq tPacketRequest) ([]byt return buf, eventFD, nil } + +func handlePollError(errno unix.Errno) error { + if errno == unix.EBADF { + return capture.ErrCaptureStopped + } + return fmt.Errorf("error polling for next packet: %w (errno %d)", errno, int(errno)) +} diff --git a/capture/afpacket/afring/afring_mock.go b/capture/afpacket/afring/afring_mock.go index 2da6270..49e3e2b 100644 --- a/capture/afpacket/afring/afring_mock.go +++ b/capture/afpacket/afring/afring_mock.go @@ -22,6 +22,16 @@ const ( blockStatusPollInterval = 10 * time.Millisecond ) +type tPacketHeaderV3Mock struct { + snaplen uint32 // 12-16 + pktLen uint32 // 16-20 + _ uint32 // skip + pktMac uint16 // 24-26 + pktNet uint16 // 26-28 + _ [15]uint16 // skip + pktType byte // 58 +} + // MockSource denotes a fully mocked ring buffer source, behaving just like one // Since it wraps a regular Source, it can be used as a stand-in replacement without any further // code modifications: @@ -62,6 +72,7 @@ func NewMockSource(_ string, options ...Option) (*MockSource, error) { }, eventHandler: mockHandler, } + src.ipLayerOffsetNum = uint32(src.ipLayerOffset) for _, opt := range options { opt(src) @@ -129,11 +140,14 @@ func (m *MockSource) addPacket(payload []byte, totalLen uint32, pktType, ipLayer block := m.ringBuffer.ring[thisBlock*m.blockSize : thisBlock*m.blockSize+m.blockSize] - *(*uint32)(unsafe.Pointer(&block[m.curBlockPos+12])) = uint32(m.snapLen) // #nosec: G103 // snapLen - *(*uint32)(unsafe.Pointer(&block[m.curBlockPos+16])) = totalLen // #nosec: G103 // totalLen - *(*uint32)(unsafe.Pointer(&block[m.curBlockPos+24])) = uint32(mac) // #nosec: G103 // mac - block[m.curBlockPos+58] = pktType // pktType - copy(block[m.curBlockPos+mac:m.curBlockPos+mac+m.snapLen], payload) // payload + *(*tPacketHeaderV3Mock)(unsafe.Pointer(&block[m.curBlockPos+12])) = tPacketHeaderV3Mock{ + snaplen: uint32(m.snapLen), + pktLen: totalLen, + pktMac: mac, + pktNet: mac + uint16(m.ipLayerOffset), + pktType: pktType, + } // #nosec: G103 + copy(block[m.curBlockPos+mac:m.curBlockPos+mac+m.snapLen], payload) // payload // Ensure that there is no "stray" nextOffset set from a previous perusal of this ring buffer block which // might remain in case the block is finalized diff --git a/capture/afpacket/afring/afring_mock_nodrain.go b/capture/afpacket/afring/afring_mock_nodrain.go index 706d7a5..ae9007f 100644 --- a/capture/afpacket/afring/afring_mock_nodrain.go +++ b/capture/afpacket/afring/afring_mock_nodrain.go @@ -5,6 +5,7 @@ package afring import ( "errors" + "runtime" "sync" "sync/atomic" "time" @@ -68,6 +69,8 @@ func (m *MockSourceNoDrain) Run(releaseInterval time.Duration) (<-chan error, er m.wgRunning.Add(1) go func(errs chan error) { + // Minimize scheduler overhead by locking this goroutine to the current thread + runtime.LockOSThread() defer func() { close(errs) m.wgRunning.Done() diff --git a/capture/afpacket/afring/afring_mock_test.go b/capture/afpacket/afring/afring_mock_test.go index 1527c57..b06aa15 100644 --- a/capture/afpacket/afring/afring_mock_test.go +++ b/capture/afpacket/afring/afring_mock_test.go @@ -385,114 +385,127 @@ func BenchmarkCaptureMethods(b *testing.B) { 6, []byte{1, 2}, 0, 128) require.Nil(b, err) - // Setup a mock source - mockSrc, err := NewMockSourceNoDrain("mock", - CaptureLength(link.CaptureLengthMinimalIPv4Transport), - Promiscuous(false), - ) - require.Nil(b, err) - - for mockSrc.CanAddPackets() { - require.Nil(b, mockSrc.AddPacket(testPacket)) - } - _, err = mockSrc.Run(time.Microsecond) - require.Nil(b, err) - - b.Run("NextPacket", func(b *testing.B) { - b.ReportAllocs() - for i := 0; i < b.N; i++ { - p, _ := mockSrc.NextPacket(nil) - _ = p - } - }) - - b.Run("NextPacketInPlace", func(b *testing.B) { - var p capture.Packet = mockSrc.NewPacket() - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - p, _ = mockSrc.NextPacket(p) - _ = p - } - }) - - b.Run("NextPayload", func(b *testing.B) { - b.ReportAllocs() - for i := 0; i < b.N; i++ { - p, pktType, totalLen, _ := mockSrc.NextPayload(nil) - _ = p - _ = pktType - _ = totalLen + for _, benchConfig := range []struct { + blockSize int + nBlocks int + blockExpiry time.Duration + }{ + {10 * 1024 * 1024, 4, time.Microsecond}, + {10 * 1024, 512, 10 * time.Nanosecond}, + } { + + // Setup a mock source + mockSrc, err := NewMockSourceNoDrain("mock", + CaptureLength(link.CaptureLengthMinimalIPv4Transport), + BufferSize(benchConfig.blockSize, benchConfig.nBlocks), + Promiscuous(false), + ) + require.Nil(b, err) + + for mockSrc.CanAddPackets() { + require.Nil(b, mockSrc.AddPacket(testPacket)) } - }) + _, err = mockSrc.Run(benchConfig.blockExpiry) + require.Nil(b, err) + + b.Run(fmt.Sprintf("NextPacket_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + p, _ := mockSrc.NextPacket(nil) + _ = p + } + }) - b.Run("NextPayloadInPlace", func(b *testing.B) { - pkt := mockSrc.NewPacket() - var p []byte = pkt.Payload() - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - p, pktType, totalLen, _ := mockSrc.NextPayload(p) - _ = p - _ = pktType - _ = totalLen - } - }) + b.Run(fmt.Sprintf("NextPacketInPlace_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { + var p capture.Packet = mockSrc.NewPacket() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + p, _ = mockSrc.NextPacket(p) + _ = p + } + }) - b.Run("NextPayloadZeroCopy", func(b *testing.B) { - b.ReportAllocs() - for i := 0; i < b.N; i++ { - p, pktType, totalLen, _ := mockSrc.NextPayloadZeroCopy() - _ = p - _ = pktType - _ = totalLen - } - }) + b.Run(fmt.Sprintf("NextPayload_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + p, pktType, totalLen, _ := mockSrc.NextPayload(nil) + _ = p + _ = pktType + _ = totalLen + } + }) - b.Run("NextIPPacket", func(b *testing.B) { - b.ReportAllocs() - for i := 0; i < b.N; i++ { - p, pktType, totalLen, _ := mockSrc.NextIPPacket(nil) - _ = p - _ = pktType - _ = totalLen - } - }) + b.Run(fmt.Sprintf("NextPayloadInPlace_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { + pkt := mockSrc.NewPacket() + var p []byte = pkt.Payload() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + p, pktType, totalLen, _ := mockSrc.NextPayload(p) + _ = p + _ = pktType + _ = totalLen + } + }) - b.Run("NextIPPacketInPlace", func(b *testing.B) { - pkt := mockSrc.NewPacket() - var p capture.IPLayer = pkt.IPLayer() - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - p, pktType, totalLen, _ := mockSrc.NextIPPacket(p) - _ = p - _ = pktType - _ = totalLen - } - }) + b.Run(fmt.Sprintf("NextPayloadZeroCopy_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + p, pktType, totalLen, _ := mockSrc.NextPayloadZeroCopy() + _ = p + _ = pktType + _ = totalLen + } + }) - b.Run("NextIPPacketZeroCopy", func(b *testing.B) { - b.ReportAllocs() - for i := 0; i < b.N; i++ { - p, pktType, totalLen, _ := mockSrc.NextIPPacketZeroCopy() - _ = p - _ = pktType - _ = totalLen - } - }) + b.Run(fmt.Sprintf("NextIPPacket_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + p, pktType, totalLen, _ := mockSrc.NextIPPacket(nil) + _ = p + _ = pktType + _ = totalLen + } + }) - b.Run("NextPacketFn", func(b *testing.B) { - b.ReportAllocs() - for i := 0; i < b.N; i++ { - _ = mockSrc.NextPacketFn(func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte) error { - _ = payload + b.Run(fmt.Sprintf("NextIPPacketInPlace_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { + pkt := mockSrc.NewPacket() + var p capture.IPLayer = pkt.IPLayer() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + p, pktType, totalLen, _ := mockSrc.NextIPPacket(p) + _ = p + _ = pktType _ = totalLen + } + }) + + b.Run(fmt.Sprintf("NextIPPacketZeroCopy_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + p, pktType, totalLen, _ := mockSrc.NextIPPacketZeroCopy() + _ = p _ = pktType - return nil - }) - } - }) + _ = totalLen + } + }) + + b.Run(fmt.Sprintf("NextPacketFn_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _ = mockSrc.NextPacketFn(func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte) error { + _ = payload + _ = totalLen + _ = pktType + return nil + }) + } + }) + + require.Nil(b, mockSrc.Close()) + } } func testCaptureMethods(t *testing.T, fn func(t *testing.T, _ *MockSource, _, _ uint16)) { diff --git a/capture/afpacket/afring/afring_zerocopy.go b/capture/afpacket/afring/afring_zerocopy.go new file mode 100644 index 0000000..fe7506f --- /dev/null +++ b/capture/afpacket/afring/afring_zerocopy.go @@ -0,0 +1,169 @@ +package afring + +import ( + "unsafe" + + "github.com/fako1024/slimcap/capture" + "golang.org/x/sys/unix" +) + +// NextPayloadZeroCopy receives the raw payload of the next packet from the source and returns it. The operation is blocking. +// The returned payload provides direct zero-copy access to the underlying data source (e.g. a ring buffer). +// Procedurally, the method extracts the next packet from either the current block or advances to the next +// one (fetching / returning its first packet). +func (s *Source) NextPayloadZeroCopy() (payload []byte, pktType capture.PacketType, pktLen uint32, err error) { + +retry: + pktHdr := s.curTPacketHeader + + // If there is an active block, attempt to simply consume a packet from it + if pktHdr.data != nil { + + // If there are more packets remaining (i.e. there is a non-zero next offset), advance + // the current position. + // According to https://github.com/torvalds/linux/blame/master/net/packet/af_packet.c#L811 the + // tp_next_offset field is guaranteed to be zero for the final packet of the block. In addition, + // it cannot be zero otherwise (because that would be an invalid block). + if nextPos := pktHdr.nextOffset(); nextPos != 0 { + + // Update position of next packet and jump to the end + pktHdr.ppos += nextPos + goto finalize + } + + // If there is no next offset, release the TPacketHeader to the kernel and move on to the next block + s.releaseAndAdvance() + } + + // Load the data for the block + s.loadTPacketHeader() + + // Check if the block is free to access in userland + for pktHdr.getStatus()&unix.TP_STATUS_USER == 0 { + + // Run a PPOLL on the file descriptor (waiting for the block to become available) + efdHasEvent, errno := s.eventHandler.Poll(unix.POLLIN | unix.POLLERR) + + // If an event was received, ensure that the respective error / code is returned + // immediately + if efdHasEvent { + pktType, err = capture.PacketUnknown, s.handleEvent() + return + } + + // Handle potential PPOLL errors + if errno != 0 { + if errno == unix.EINTR { + continue + } + pktType, err = capture.PacketUnknown, handlePollError(errno) + return + } + + // Handle rare cases of runaway packets (this call will advance to the next block + // as a side effect in case of a detection) + if s.hasRunawayBlock() { + continue + } + } + + // Set the position of the first packet in this block and jump to end + pktHdr.ppos = pktHdr.offsetToFirstPkt() + +finalize: + + // Apply filter (if any) + if s.filter > 0 && s.filter&pktHdr.data[pktHdr.ppos+58] != 0 { + goto retry + } + + // Parse the V3 TPacketHeader and the first byte of the payload + hdr := pktHdr.parseHeader() + pos := pktHdr.ppos + uint32(hdr.pktMac) + + // Return the payload / IP layer subslice & heeader parameters + return unsafe.Slice(&pktHdr.data[pos], hdr.snaplen), + pktHdr.data[pktHdr.ppos+58], + hdr.pktLen, nil + +} + +// NextIPPacketZeroCopy receives the IP layer of the next packet from the source and returns it. The operation is blocking. +// The returned IPLayer provides direct zero-copy access to the underlying data source (e.g. a ring buffer). +// Procedurally, the method extracts the next packet from either the current block or advances to the next +// one (fetching / returning its first packet IP layer). +func (s *Source) NextIPPacketZeroCopy() (ipLayer capture.IPLayer, pktType capture.PacketType, pktLen uint32, err error) { + +retry: + pktHdr := s.curTPacketHeader + + // If there is an active block, attempt to simply consume a packet from it + if pktHdr.data != nil { + + // If there are more packets remaining (i.e. there is a non-zero next offset), advance + // the current position. + // According to https://github.com/torvalds/linux/blame/master/net/packet/af_packet.c#L811 the + // tp_next_offset field is guaranteed to be zero for the final packet of the block. In addition, + // it cannot be zero otherwise (because that would be an invalid block). + if nextPos := pktHdr.nextOffset(); nextPos != 0 { + + // Update position of next packet and jump to the end + pktHdr.ppos += nextPos + goto finalize + } + + // If there is no next offset, release the TPacketHeader to the kernel and move on to the next block + s.releaseAndAdvance() + } + + // Load the data for the block + s.loadTPacketHeader() + + // Check if the block is free to access in userland + for pktHdr.getStatus()&unix.TP_STATUS_USER == 0 { + + // Run a PPOLL on the file descriptor (waiting for the block to become available) + efdHasEvent, errno := s.eventHandler.Poll(unix.POLLIN | unix.POLLERR) + + // If an event was received, ensure that the respective error / code is returned + // immediately + if efdHasEvent { + pktType, err = capture.PacketUnknown, s.handleEvent() + return + } + + // Handle potential PPOLL errors + if errno != 0 { + if errno == unix.EINTR { + continue + } + pktType, err = capture.PacketUnknown, handlePollError(errno) + return + } + + // Handle rare cases of runaway packets (this call will advance to the next block + // as a side effect in case of a detection) + if s.hasRunawayBlock() { + continue + } + } + + // Set the position of the first packet in this block and jump to end + pktHdr.ppos = pktHdr.offsetToFirstPkt() + +finalize: + + // Apply filter (if any) + if s.filter > 0 && s.filter&pktHdr.data[pktHdr.ppos+58] != 0 { + goto retry + } + + // Parse the V3 TPacketHeader and the first byte of the payload + hdr := pktHdr.parseHeader() + pos := pktHdr.ppos + uint32(hdr.pktNet) + + // Extract the payload (zero-copy) & parameters + return unsafe.Slice(&pktHdr.data[pos], hdr.snaplen-s.ipLayerOffsetNum), + pktHdr.data[pktHdr.ppos+58], + hdr.pktLen, nil +} diff --git a/capture/afpacket/afring/ring.go b/capture/afpacket/afring/ring.go new file mode 100644 index 0000000..6b94e8f --- /dev/null +++ b/capture/afpacket/afring/ring.go @@ -0,0 +1,33 @@ +//go:build linux +// +build linux + +package afring + +import "golang.org/x/sys/unix" + +type ringBuffer struct { + ring []byte + + tpReq tPacketRequest + curTPacketHeader *tPacketHeader + offset int +} + +func (b *ringBuffer) releaseAndAdvance() { + b.curTPacketHeader.setStatus(unix.TP_STATUS_KERNEL) + b.offset = (b.offset + 1) % int(b.tpReq.blockNr) +} + +func (b *ringBuffer) loadTPacketHeader() { + b.curTPacketHeader.data = b.ring[b.offset*int(b.tpReq.blockSize):] +} + +func (b *ringBuffer) hasRunawayBlock() bool { + if b.curTPacketHeader.getStatus()&unix.TP_STATUS_COPY != 0 { + b.releaseAndAdvance() + b.curTPacketHeader.data = nil + return true + } + + return false +} diff --git a/capture/afpacket/afring/tpacket.go b/capture/afpacket/afring/tpacket.go index 8fb60db..af82f88 100644 --- a/capture/afpacket/afring/tpacket.go +++ b/capture/afpacket/afring/tpacket.go @@ -86,56 +86,41 @@ func (t tPacketRequest) blockSizeNr() int { return int(t.blockSize * t.blockNr) } -// tPacketHeader denotes the V3 tpacket_hdr structure, c.f. -// https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_packet.h +// tPacketHeader denotes a wrapper around the raw data of a TPacket block structure type tPacketHeader struct { - data []byte - ppos uint32 - nPktsLeft uint32 + data []byte + ppos uint32 } -func (t tPacketHeader) nPkts() uint32 { - return *(*uint32)(unsafe.Pointer(&t.data[12])) // #nosec G103 +// tPacketHeaderV3 denotes the V3 tpacket_hdr structure, c.f. +// https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_packet.h +// Note: The struct parses only the relevant portions of the header, the rest is +// skipped / ignored by means of dummy elements of the correct in-memory size +type tPacketHeaderV3 struct { + snaplen uint32 // 12-16 + pktLen uint32 // 16-20 + _ uint32 // skip + pktMac uint16 // 24-26 + pktNet uint16 // 26-28 } func (t tPacketHeader) offsetToFirstPkt() uint32 { return *(*uint32)(unsafe.Pointer(&t.data[16])) // #nosec G103 } -// 2 * 3 * uint32 for timestamps - -// / -> Packet Header func (t tPacketHeader) nextOffset() uint32 { return *(*uint32)(unsafe.Pointer(&t.data[t.ppos])) // #nosec G103 } -// 2 * uint32 for timestamps - -func (t tPacketHeader) snapLen() uint32 { - return *(*uint32)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 -} - -func (t tPacketHeader) pktLen() uint32 { - return *(*uint32)(unsafe.Pointer(&t.data[t.ppos+16])) // #nosec G103 -} - -func (t tPacketHeader) pktLenPut(data []byte) { +func (t tPacketHeader) pktLenCopy(data []byte) { copy(data, t.data[t.ppos+16:t.ppos+20]) } -func (t tPacketHeader) packetType() byte { - return t.data[t.ppos+58] +func (t tPacketHeader) parseHeader() *tPacketHeaderV3 { + return (*tPacketHeaderV3)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 } -func (t tPacketHeader) payloadNoCopyAtOffset(offset, to uint32) []byte { - pos := t.ppos + uint32(*(*uint16)(unsafe.Pointer(&t.data[t.ppos+24]))) // #nosec G103 - return t.data[pos+offset : pos+to] -} - -func (t tPacketHeader) payloadCopyPutAtOffset(data []byte, offset, to uint32) { - pos := t.ppos + uint32(*(*uint16)(unsafe.Pointer(&t.data[t.ppos+24]))) // #nosec G103 - copy(data, t.data[pos+offset:pos+to]) -} +////////////////////////////////////////////////////////////////////////////////////////////////// func pageSizeAlign(x int) int { return int((uint(x) + pageSizeAlignment - 1) &^ (pageSizeAlignment - 1)) @@ -180,6 +165,18 @@ func blockSizeTPacketAlign(x, blockSize int) (int, error) { // return *(*uint32)(unsafe.Pointer(&t.data[4])) // } +// func (t tPacketHeader) nPkts() uint32 { +// return *(*uint32)(unsafe.Pointer(&t.data[12])) // #nosec G103 +// } + +// func (t tPacketHeader) snapLen() uint32 { +// return *(*uint32)(unsafe.Pointer(&t.data[t.ppos+12])) // #nosec G103 +// } + +// func (t tPacketHeader) pktLen() uint32 { +// return *(*uint32)(unsafe.Pointer(&t.data[t.ppos+16])) // #nosec G103 +// } + // func (t tPacketHeader) blockLen() uint32 { // return *(*uint32)(unsafe.Pointer(&t.data[20])) // } @@ -204,3 +201,13 @@ func blockSizeTPacketAlign(x, blockSize int) (int, error) { // func (t tPacketHeader) net() uint16 { // return *(*uint16)(unsafe.Pointer(&t.data[t.ppos+26])) // } + +// func (t tPacketHeader) payloadNoCopyAtOffset(offset, to uint32) []byte { +// pos := t.ppos + uint32(*(*uint16)(unsafe.Pointer(&t.data[t.ppos+24]))) // #nosec G103 +// return t.data[pos+offset : pos+to] +// } + +// func (t tPacketHeader) payloadCopyPutAtOffset(data []byte, offset, to uint32) { +// pos := t.ppos + uint32(*(*uint16)(unsafe.Pointer(&t.data[t.ppos+24]))) // #nosec G103 +// copy(data, t.data[pos+offset:pos+to]) +// } diff --git a/event/event_mock_test.go b/event/event_mock_test.go index 409b96f..156d2aa 100644 --- a/event/event_mock_test.go +++ b/event/event_mock_test.go @@ -5,7 +5,6 @@ package event import ( "errors" - "syscall" "testing" "time" @@ -57,13 +56,13 @@ func TestPollOnClosedFD(t *testing.T) { efdHasEvent, errno := handler.Poll(unix.POLLIN | unix.POLLERR) require.True(t, efdHasEvent) - require.Equal(t, syscall.Errno(0x0), errno) + require.Equal(t, unix.Errno(0x0), errno) _, err = handler.Efd.ReadEvent() require.Nil(t, err) for i := 0; i < 10; i++ { efdHasEvent, errno := handler.Poll(unix.POLLIN | unix.POLLERR) require.False(t, efdHasEvent) - require.Equal(t, syscall.Errno(0x9), errno) + require.Equal(t, unix.Errno(0x9), errno) } } diff --git a/event/poll.go b/event/poll.go index 9fcf9dd..b64b5eb 100644 --- a/event/poll.go +++ b/event/poll.go @@ -9,7 +9,10 @@ import ( "golang.org/x/sys/unix" ) -const nPollEvents = 2 +const ( + eventPollIn = unix.POLLIN + eventConnReset = unix.POLLHUP | unix.POLLERR +) ///////////////////////////////////////////////////////////////////////////////////////// @@ -29,16 +32,3 @@ func (p *Handler) recvfrom(buf []byte, flags int) (int, uint8, error) { return n, pktType, nil } - -func poll(pollEvents [nPollEvents]unix.PollFd) (bool, unix.Errno) { - errno := pollBlock(&pollEvents[0], nPollEvents) - if errno != 0 { - return pollEvents[0].Revents&unix.POLLIN != 0, errno - } - - if pollEvents[1].Revents&unix.POLLHUP != 0 || pollEvents[1].Revents&unix.POLLERR != 0 { - errno = unix.ECONNRESET - } - - return pollEvents[0].Revents&unix.POLLIN != 0, errno -} diff --git a/event/poll_386.s b/event/poll_386.s index 97fcb5d..803c4fe 100644 --- a/event/poll_386.s +++ b/event/poll_386.s @@ -1,24 +1,27 @@ +// +build !slimcap_noasm + #include "textflag.h" #define INVOKE_SYSCALL INT $0x80 #define SYS__PPOLL 0x135 +#define N_EVTS 0x02 -// func pollBlock(fds *unix.PollFd, nfds int) (err syscall.Errno) -TEXT ·pollBlock(SB),NOSPLIT,$0-12 +// func pollBlock(fds *unix.PollFd) (err unix.Errno) +TEXT ·pollBlock(SB),NOSPLIT,$0-8 CALL runtime·entersyscallblock(SB) // Call blocking SYSCALL directive from runtime package MOVL $SYS__PPOLL, AX // Prepare / perform ppoll() SYSCALL MOVL fds+0(FP), BX // PollFDs parameter - MOVL nfds+4(FP), CX // Put nFDs parameter + MOVL $N_EVTS, CX // Put nFDs parameter (constant N_EVTS) MOVL $0x0, DX // Put timeout parameter (set to NULL) MOVL $0x0, SI // Put sigmask parameter (skip) INVOKE_SYSCALL CMPL AX, $0xfffff002 // No error / EINTR JLS success // Jump to success NEGL AX // Negate SYSCALL errno - MOVL AX, err+8(FP) // Store error code in err return value + MOVL AX, err+4(FP) // Store error code in err return value CALL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return success: - MOVL $0, err+8(FP) // Store NULL error code in err return value + MOVL $0, err+4(FP) // Store NULL error code in err return value CALL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return diff --git a/event/poll_amd64.s b/event/poll_amd64.s index b4a1dfe..0738512 100644 --- a/event/poll_amd64.s +++ b/event/poll_amd64.s @@ -1,12 +1,15 @@ +// +build !slimcap_noasm + #include "textflag.h" #define SYS__PPOLL 0x10f +#define N_EVTS 0x02 -// func pollBlock(fds *unix.PollFd, nfds int) (err syscall.Errno) -TEXT ·pollBlock(SB),NOSPLIT,$0-24 +// func pollBlock(fds *unix.PollFd) (err unix.Errno) +TEXT ·pollBlock(SB),NOSPLIT,$0-16 CALL runtime·entersyscallblock(SB) // Call blocking SYSCALL directive from runtime package MOVQ fds+0(FP), DI // PollFDs parameter - MOVQ nfds+8(FP), SI // Put nFDs parameter + MOVQ $N_EVTS, SI // Put nFDs parameter (constant N_EVTS) MOVQ $0x0, DX // Put timeout parameter (set to NULL) MOVQ $0x0, R10 // Put sigmask parameter (skip) MOVQ $SYS__PPOLL, AX // Prepare / perform ppoll() SYSCALL @@ -14,10 +17,10 @@ TEXT ·pollBlock(SB),NOSPLIT,$0-24 CMPQ AX, $0xfffffffffffff002 // No error / EINTR JLS success // Jump to success NEGQ AX // Negate SYSCALL errno - MOVQ AX, err+16(FP) // Store error code in err return value + MOVQ AX, err+8(FP) // Store error code in err return value CALL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return success: - MOVQ $0, err+16(FP) // Store NULL error code in err return value + MOVQ $0, err+8(FP) // Store NULL error code in err return value CALL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return diff --git a/event/poll_arm.s b/event/poll_arm.s index 38ed490..2761d12 100644 --- a/event/poll_arm.s +++ b/event/poll_arm.s @@ -1,24 +1,27 @@ +// +build !slimcap_noasm + #include "textflag.h" #define SYS__PPOLL 0x150 +#define N_EVTS 0x02 -// func pollBlock(fds *unix.PollFd, nfds int) (err syscall.Errno) -TEXT ·pollBlock(SB),NOSPLIT,$0-12 +// func pollBlock(fds *unix.PollFd) (err unix.Errno) +TEXT ·pollBlock(SB),NOSPLIT,$0-8 BL runtime·entersyscallblock(SB) // Call blocking SYSCALL directive from runtime package MOVW $SYS__PPOLL, R7 // Prepare / perform ppoll() SYSCALL MOVW fds+0(FP), R0 // PollFDs parameter - MOVW nfds+4(FP), R1 // Put nFDs parameter + MOVW $N_EVTS, R1 // Put nFDs parameter (constant N_EVTS) MOVW $0x0, R2 // Put timeout parameter (set to NULL) MOVW $0x0, R3 // Put sigmask parameter (skip) SWI $0 CMP $0xfffff002, R0 // No error / EINTR BLS success // Jump to success RSB $0, R0, R0 // Negate SYSCALL errno - MOVW R0, err+8(FP) // Store error code in err return value + MOVW R0, err+4(FP) // Store error code in err return value BL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return success: MOVW $0, R0 - MOVW R0, err+8(FP) // Store NULL error code in err return value + MOVW R0, err+4(FP) // Store NULL error code in err return value BL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return diff --git a/event/poll_arm64.s b/event/poll_arm64.s index 1570825..a65021b 100644 --- a/event/poll_arm64.s +++ b/event/poll_arm64.s @@ -1,12 +1,15 @@ +// +build !slimcap_noasm + #include "textflag.h" #define SYS__PPOLL 0x49 +#define N_EVTS 0x02 -// func pollBlock(fds *unix.PollFd, nfds int) (err syscall.Errno) -TEXT ·pollBlock(SB),NOSPLIT,$0-24 +// func pollBlock(fds *unix.PollFd) (err unix.Errno) +TEXT ·pollBlock(SB),NOSPLIT,$0-16 BL runtime·entersyscallblock(SB) // Call blocking SYSCALL directive from runtime package MOVD fds+0(FP), R0 // PollFDs parameter - MOVD nfds+8(FP), R1 // Put nFDs parameter + MOVD $N_EVTS, R1 // Put nFDs parameter (constant N_EVTS) MOVD $0x0, R2 // Put timeout parameter (set to NULL) MOVD $0x0, R3 // Put sigmask parameter (skip) MOVD $SYS__PPOLL, R8 // Prepare / perform ppoll() SYSCALL @@ -14,10 +17,10 @@ TEXT ·pollBlock(SB),NOSPLIT,$0-24 CMP $0xfffffffffffff002, R0 // No error / EINTR BLS success // Jump to success NEG R0, R0 // Negate SYSCALL errno - MOVD R0, err+16(FP) // Store error code in err return value + MOVD R0, err+8(FP) // Store error code in err return value BL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return success: - MOVD $0, err+16(FP) // Store NULL error code in err return value + MOVD $0, err+8(FP) // Store NULL error code in err return value BL runtime·exitsyscall(SB) // Finalize SYSCALL using the directive from runtime package RET // Return diff --git a/event/poll_asm.go b/event/poll_asm.go index 90596e6..9c349ae 100644 --- a/event/poll_asm.go +++ b/event/poll_asm.go @@ -1,28 +1,26 @@ -//go:build (linux && amd64) || (linux && arm64) || (linux && arm) || (linux && 386) +//go:build ((linux && amd64) || (linux && arm64) || (linux && arm) || (linux && 386)) && !slimcap_noasm // +build linux,amd64 linux,arm64 linux,arm linux,386 +// +build !slimcap_noasm package event import ( - "golang.org/x/sys/unix" - _ "unsafe" // required to support go:linkname + + "golang.org/x/sys/unix" ) //go:noescape //go:nosplit -func pollBlock(fds *unix.PollFd, nfds int) unix.Errno +//go:norace +func pollBlock(fds *unix.PollFd) unix.Errno //////////////////////////////////// -// The following stubs are required to allow unsafe access to their equivalent in the runtime package from assembly +// The following stub is required to allow unsafe access to their equivalent in the runtime package from assembly // This might break in the future, but there various sources that claim that it's widely used (and even issues at least // imply that it's ok to use in favor of exposing that functionality (c.f. https://github.com/golang/go/issues/29734) //go:linkname entersyscallblock runtime.entersyscallblock //go:noescape func entersyscallblock() //nolint:deadcode - -//go:linkname exitsyscall runtime.exitsyscall -//go:noescape -func exitsyscall() //nolint:deadcode diff --git a/event/poll_default.go b/event/poll_default.go index 8c8a1fd..0d1ef36 100644 --- a/event/poll_default.go +++ b/event/poll_default.go @@ -1,5 +1,5 @@ -//go:build linux && !amd64 && !arm64 && !arm && !386 -// +build linux,!amd64,!arm64,!arm,!386 +//go:build (linux && !amd64 && !arm64 && !arm && !386) || slimcap_noasm +// +build linux,!amd64,!arm64,!arm,!386 slimcap_noasm package event @@ -9,11 +9,13 @@ import ( "golang.org/x/sys/unix" ) -func pollBlock(fds *unix.PollFd, nfds int) unix.Errno { +const nPollEvents = uintptr(0x02) + +func pollBlock(fds *unix.PollFd) unix.Errno { // #nosec: G103 _, _, e := unix.Syscall6(unix.SYS_PPOLL, uintptr(unsafe.Pointer(fds)), - uintptr(nfds), uintptr(unsafe.Pointer(nil)), 0, 0, 0) + nPollEvents, uintptr(unsafe.Pointer(nil)), 0, 0, 0) return e } diff --git a/event/poll_mock.go b/event/poll_mock.go index 0a3c052..76beafa 100644 --- a/event/poll_mock.go +++ b/event/poll_mock.go @@ -26,7 +26,7 @@ type Handler struct { // Poll polls (blocking, hence no timeout) for events on the file descriptor and the event // file descriptor (waiting for a POLLIN event). -func (p *Handler) Poll(events int16) (bool, unix.Errno) { +func (p *Handler) Poll(events int16) (hasEvent bool, errno unix.Errno) { pollEvents := [...]unix.PollFd{ { Fd: int32(p.Efd), @@ -38,21 +38,25 @@ func (p *Handler) Poll(events int16) (bool, unix.Errno) { }, } - // Fast path: If this is not a MockHandler, simply return a regular poll + // Perform blocking PPOLL + errno = pollBlock(&pollEvents[0]) + if errno == 0 && pollEvents[1].Revents&eventConnReset != 0 { + errno = unix.ECONNRESET + } + hasEvent = pollEvents[0].Revents&eventPollIn != 0 + + // Fast path: If this is not a MockHandler, simply return if p.mockFd == nil { - return poll(pollEvents) + return } - // MockHandler logic: Poll, then release the semaphore, indicating data has + // MockHandler logic: Release the semaphore, indicating data has // been consumed - hasEvent, errno := poll(pollEvents) if !hasEvent && errno == 0 { - if errno := p.mockFd.ReleaseSemaphore(); errno != 0 { - return false, errno - } + errno = p.mockFd.ReleaseSemaphore() } - return hasEvent, errno + return } // Recvfrom retrieves data directly from the socket diff --git a/event/poll_nomock.go b/event/poll_nomock.go index 0260268..fd60d96 100644 --- a/event/poll_nomock.go +++ b/event/poll_nomock.go @@ -21,7 +21,7 @@ type Handler struct { // Poll polls (blocking, hence no timeout) for events on the file descriptor and the event // file descriptor (waiting for a POLLIN event). -func (p *Handler) Poll(events int16) (bool, unix.Errno) { +func (p *Handler) Poll(events int16) (hasEvent bool, errno unix.Errno) { pollEvents := [...]unix.PollFd{ { Fd: int32(p.Efd), @@ -33,7 +33,14 @@ func (p *Handler) Poll(events int16) (bool, unix.Errno) { }, } - return poll(pollEvents) + // Perform blocking PPOLL + errno = pollBlock(&pollEvents[0]) + if errno == 0 && pollEvents[1].Revents&eventConnReset != 0 { + errno = unix.ECONNRESET + } + hasEvent = pollEvents[0].Revents&eventPollIn != 0 + + return } // Recvfrom retrieves data directly from the socket diff --git a/examples/dump/main.go b/examples/dump/main.go index 0131bd3..86cbb11 100644 --- a/examples/dump/main.go +++ b/examples/dump/main.go @@ -10,6 +10,7 @@ import ( "os" "github.com/els0r/telemetry/logging" + "github.com/fako1024/slimcap/capture" "github.com/fako1024/slimcap/capture/afpacket/afring" "github.com/fako1024/slimcap/link" ) @@ -65,7 +66,7 @@ func main() { logger.Infof("Reading %d packets from wire (zero-copy function call)...", maxPkts) for i := 0; i < maxPkts; i++ { if err := listener.NextPacketFn(func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte) (err error) { - logger.Infof("Received packet with Payload on `%s` (total len %d): %v (inbound: %v)", devName, totalLen, payload, p.IsInbound()) + logger.Infof("Received packet with Payload on `%s` (total len %d): %v (inbound: %v)", devName, totalLen, payload, pktType != capture.PacketOutgoing) return }); err != nil { logger.Fatalf("error during capture (zero-copy function call) on `%s`: %s", devName, err) diff --git a/examples/trace/trace.go b/examples/trace/trace.go index f16a0f7..a104f5b 100644 --- a/examples/trace/trace.go +++ b/examples/trace/trace.go @@ -9,12 +9,12 @@ import ( "sort" "strings" "sync" - "syscall" "github.com/fako1024/slimcap/capture" "github.com/fako1024/slimcap/capture/afpacket/afpacket" "github.com/fako1024/slimcap/capture/afpacket/afring" "github.com/fako1024/slimcap/link" + "golang.org/x/sys/unix" ) // Capture denotes a simple capturing structure / manager @@ -163,7 +163,7 @@ func (c *Capture) Run() (err error) { logger.Infof("attempting capture on interfaces [%s]", strings.Join(capturing, ",")) sigExitChan := make(chan os.Signal, 1) - signal.Notify(sigExitChan, syscall.SIGTERM, os.Interrupt) + signal.Notify(sigExitChan, unix.SIGTERM, os.Interrupt) var listeners []capture.Source // Fork a goroutine for each interface diff --git a/link/interface_linux.go b/link/interface_linux.go index 78fad8f..2ac09e4 100644 --- a/link/interface_linux.go +++ b/link/interface_linux.go @@ -10,7 +10,8 @@ import ( "os" "strconv" "strings" - "syscall" + + "golang.org/x/sys/unix" ) const ( @@ -60,7 +61,7 @@ func (i Interface) IsUp() (bool, error) { return false, err } - return flags&syscall.IFF_UP != 0, nil + return flags&unix.IFF_UP != 0, nil } ////////////////////////////////////////////////////////////////////////////////