Skip to content

Commit

Permalink
[Packetbeat] Add support for af_packet fanout group (#35453)
Browse files Browse the repository at this point in the history
To scale processing across multiple Packetbeat processes, a fanout group
identifier can be specified. When fanout_group is used the Linux kernel splits
packets across Packetbeat instances in the same group by using a flow hash. It
computes the flow hash modulo with the number of Packetbeat processes in order
to consistently route flows to the same Packetbeat instance.

The value must be between 0 and 65535. By default, no value is set.

This is only available on Linux and requires using type: af_packet. Each process
must be running in same network namespace. All processes must use the same
interface settings. You must take responsibility for running multiple instances of
Packetbeat.

Closes #35451
  • Loading branch information
andrewkroh authored May 16, 2023
1 parent d0a51dc commit 3eb1860
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415

*Packetbeat*

- Added `packetbeat.interfaces.fanout_group` to allow a Packetbeat sniffer to join an AF_PACKET fanout group. {issue}35451[35451] {pull}35453[35453]

*Winlogbeat*

Expand Down
14 changes: 14 additions & 0 deletions packetbeat/_meta/config/beat.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ packetbeat.interfaces.internal_networks:
# The default is 30 MB.
#packetbeat.interfaces.buffer_size_mb: 30

# To scale processing across multiple Packetbeat processes, a fanout group
# identifier can be specified. When `fanout_group` is used the Linux kernel splits
# packets across Packetbeat instances in the same group by using a flow hash. It
# computes the flow hash modulo with the number of Packetbeat processes in order
# to consistently route flows to the same Packetbeat instance.
#
# The value must be between 0 and 65535. By default, no value is set.
#
# This is only available on Linux and requires using `type: af_packet`. Each process
# must be running in same network namespace. All processes must use the same
# interface settings. You must take responsibility for running multiple instances
# of Packetbeat.
#packetbeat.interfaces.fanout_group: ~

# Packetbeat automatically generates a BPF for capturing only the traffic on
# ports where it expects to find known protocols. Use this settings to tell
# Packetbeat to generate a BPF filter that accepts VLAN tags.
Expand Down
10 changes: 10 additions & 0 deletions packetbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
)

var errFanoutGroupAFPacketOnly = errors.New("fanout_group is only valid with af_packet type")

type Config struct {
Interface *InterfaceConfig `config:"interfaces"`
Interfaces []InterfaceConfig `config:"interfaces"`
Expand Down Expand Up @@ -124,6 +126,7 @@ type InterfaceConfig struct {
BufferSizeMb int `config:"buffer_size_mb"`
EnableAutoPromiscMode bool `config:"auto_promisc_mode"`
InternalNetworks []string `config:"internal_networks"`
FanoutGroup *uint16 `config:"fanout_group"` // Fanout group ID for AF_PACKET.
TopSpeed bool
Dumpfile string // Dumpfile is the basename of pcap dumpfiles. The file names will have a creation time stamp and .pcap extension appended.
OneAtATime bool
Expand Down Expand Up @@ -151,3 +154,10 @@ type ProtocolCommon struct {
func (f *Flows) IsEnabled() bool {
return f != nil && (f.Enabled == nil || *f.Enabled)
}

func (i InterfaceConfig) Validate() error {
if i.Type != "af_packet" && i.FanoutGroup != nil {
return errFanoutGroupAFPacketOnly
}
return nil
}
33 changes: 33 additions & 0 deletions packetbeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package config

import (
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -446,6 +447,33 @@ protocols:
`,
wantErr: errors.New("duplicated device configurations: default_route"),
},
{
name: "af_packet_fanout_group",
want: Config{
Interfaces: []InterfaceConfig{
{
Device: "any",
Type: "af_packet",
FanoutGroup: pointer(uint16(1)),
},
},
},
config: `
interfaces:
device: any
type: af_packet
fanout_group: 1
`,
},
{
name: "invalid_type_fanout_group",
wantErr: fmt.Errorf("%w accessing 'interfaces'", errFanoutGroupAFPacketOnly),
config: `
interfaces:
device: any
fanout_group: 1
`,
},
}

func TestFromStatic(t *testing.T) {
Expand Down Expand Up @@ -513,3 +541,8 @@ func configC(a, b *config.C) bool {
}
return cmp.Equal(ma, mb)
}

// pointer returns a pointer to val.
func pointer[T any](val T) *T {
return &val
}
24 changes: 24 additions & 0 deletions packetbeat/docs/packetbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,30 @@ packetbeat.interfaces.type: af_packet
packetbeat.interfaces.buffer_size_mb: 100
------------------------------------------------------------------------------

[float]
==== `fanout_group`

To scale processing across multiple Packetbeat processes, a fanout group
identifier can be specified. When `fanout_group` is used the Linux kernel splits
packets across Packetbeat instances in the same group by using a flow hash. It
computes the flow hash modulo with the number of Packetbeat processes in order
to consistently route flows to the same Packetbeat instance.

The value must be between 0 and 65535. By default, no value is set.

This is only available on Linux and requires using `type: af_packet`. Each process
must be running in same network namespace. All processes must use the same
interface settings. You must take responsibility for running multiple instances
of Packetbeat.

Example:

[source,yaml]
------------------------------------------------------------------------------
packetbeat.interfaces.type: af_packet
packetbeat.interfaces.fanout_group: 1
------------------------------------------------------------------------------

[float]
==== `auto_promisc_mode`

Expand Down
14 changes: 14 additions & 0 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ packetbeat.interfaces.internal_networks:
# The default is 30 MB.
#packetbeat.interfaces.buffer_size_mb: 30

# To scale processing across multiple Packetbeat processes, a fanout group
# identifier can be specified. When `fanout_group` is used the Linux kernel splits
# packets across Packetbeat instances in the same group by using a flow hash. It
# computes the flow hash modulo with the number of Packetbeat processes in order
# to consistently route flows to the same Packetbeat instance.
#
# The value must be between 0 and 65535. By default, no value is set.
#
# This is only available on Linux and requires using `type: af_packet`. Each process
# must be running in same network namespace. All processes must use the same
# interface settings. You must take responsibility for running multiple instances
# of Packetbeat.
#packetbeat.interfaces.fanout_group: ~

# Packetbeat automatically generates a BPF for capturing only the traffic on
# ports where it expects to find known protocols. Use this settings to tell
# Packetbeat to generate a BPF filter that accepts VLAN tags.
Expand Down
14 changes: 14 additions & 0 deletions packetbeat/sniffer/afpacket.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,22 @@ package sniffer

import (
"fmt"
"time"
)

type afPacketConfig struct {
// Device name (e.g. eth0). 'any' may be used to listen on all interfaces.
Device string
// Size of frame. A frame can be of any size with the only condition it can fit in a block.
FrameSize int
// Minimal size of contiguous block. Must be divisible by the FrameSize and OS page size.
BlockSize int
NumBlocks int // Number of blocks.
PollTimeout time.Duration // Duration that poll() should block waiting for data.
FanoutGroupID *uint16 // Optional fanout group identifier.
Promiscuous bool // Put device into promiscuous mode. Ignored when using 'any' device.
}

// afpacketComputeSize computes the block_size and the num_blocks in such a way
// that the allocated mmap buffer is close to but smaller than target_size_mb.
// The restriction is that the block_size must be divisible by both the frame
Expand Down
51 changes: 30 additions & 21 deletions packetbeat/sniffer/afpacket_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package sniffer
import (
"fmt"
"syscall"
"time"
"unsafe"

"github.com/google/gopacket"
Expand All @@ -44,50 +43,60 @@ type afpacketHandle struct {
log *logp.Logger
}

func newAfpacketHandle(device string, snaplen, block_size, num_blocks int, timeout time.Duration, autoPromiscMode bool) (*afpacketHandle, error) {
func newAfpacketHandle(c afPacketConfig) (*afpacketHandle, error) {
var err error
var promiscEnabled bool
log := logp.NewLogger("sniffer")

if autoPromiscMode {
promiscEnabled, err = isPromiscEnabled(device)
if c.Promiscuous {
promiscEnabled, err = isPromiscEnabled(c.Device)
if err != nil {
log.Errorf("Failed to get promiscuous mode for device '%s': %v", device, err)
log.Errorf("Failed to get promiscuous mode for device '%s': %v", c.Device, err)
}

if !promiscEnabled {
if setPromiscErr := setPromiscMode(device, true); setPromiscErr != nil {
if setPromiscErr := setPromiscMode(c.Device, true); setPromiscErr != nil {
log.Warnf("Failed to set promiscuous mode for device '%s'. "+
"Packetbeat may be unable to see any network traffic. Please follow packetbeat "+
"FAQ to learn about mitigation: Error: %v", device, err)
"FAQ to learn about mitigation: Error: %v", c.Device, err)
}
}
}

h := &afpacketHandle{
promiscPreviousState: promiscEnabled,
frameSize: snaplen,
device: device,
promiscPreviousStateDetected: autoPromiscMode && err == nil,
frameSize: c.FrameSize,
device: c.Device,
promiscPreviousStateDetected: c.Promiscuous && err == nil,
log: log,
}

if device == "any" {
if c.Device == "any" {
h.TPacket, err = afpacket.NewTPacket(
afpacket.OptFrameSize(snaplen),
afpacket.OptBlockSize(block_size),
afpacket.OptNumBlocks(num_blocks),
afpacket.OptPollTimeout(timeout))
afpacket.OptFrameSize(c.FrameSize),
afpacket.OptBlockSize(c.BlockSize),
afpacket.OptNumBlocks(c.NumBlocks),
afpacket.OptPollTimeout(c.PollTimeout))
} else {
h.TPacket, err = afpacket.NewTPacket(
afpacket.OptInterface(device),
afpacket.OptFrameSize(snaplen),
afpacket.OptBlockSize(block_size),
afpacket.OptNumBlocks(num_blocks),
afpacket.OptPollTimeout(timeout))
afpacket.OptInterface(c.Device),
afpacket.OptFrameSize(c.FrameSize),
afpacket.OptBlockSize(c.BlockSize),
afpacket.OptNumBlocks(c.NumBlocks),
afpacket.OptPollTimeout(c.PollTimeout))
}
if err != nil {
return nil, fmt.Errorf("failed creating af_packet socket: %w", err)
}

if c.FanoutGroupID != nil {
if err = h.TPacket.SetFanout(afpacket.FanoutHashWithDefrag, *c.FanoutGroupID); err != nil {
return nil, fmt.Errorf("failed setting af_packet fanout group: %w", err)
}
log.Infof("Joined af_packet fanout group %v", *c.FanoutGroupID)
}

return h, err
return h, nil
}

func (h *afpacketHandle) ReadPacketData() (data []byte, ci gopacket.CaptureInfo, err error) {
Expand Down
3 changes: 1 addition & 2 deletions packetbeat/sniffer/afpacket_nonlinux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package sniffer

import (
"errors"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
Expand All @@ -32,7 +31,7 @@ var errAFPacketLinuxOnly = errors.New("af_packet MMAP sniffing is only available

type afpacketHandle struct{}

func newAfpacketHandle(_ string, _, _, _ int, _ time.Duration, _ bool) (*afpacketHandle, error) {
func newAfpacketHandle(_ afPacketConfig) (*afpacketHandle, error) {
return nil, errAFPacketLinuxOnly
}

Expand Down
10 changes: 9 additions & 1 deletion packetbeat/sniffer/sniffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,15 @@ func openAFPacket(device, filter string, cfg *config.InterfaceConfig) (snifferHa
}

timeout := 500 * time.Millisecond
h, err := newAfpacketHandle(device, szFrame, szBlock, numBlocks, timeout, cfg.EnableAutoPromiscMode)
h, err := newAfpacketHandle(afPacketConfig{
Device: device,
FrameSize: szFrame,
BlockSize: szBlock,
NumBlocks: numBlocks,
PollTimeout: timeout,
FanoutGroupID: cfg.FanoutGroup,
Promiscuous: cfg.EnableAutoPromiscMode,
})
if err != nil {
return nil, err
}
Expand Down
14 changes: 14 additions & 0 deletions x-pack/packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ packetbeat.interfaces.internal_networks:
# The default is 30 MB.
#packetbeat.interfaces.buffer_size_mb: 30

# To scale processing across multiple Packetbeat processes, a fanout group
# identifier can be specified. When `fanout_group` is used the Linux kernel splits
# packets across Packetbeat instances in the same group by using a flow hash. It
# computes the flow hash modulo with the number of Packetbeat processes in order
# to consistently route flows to the same Packetbeat instance.
#
# The value must be between 0 and 65535. By default, no value is set.
#
# This is only available on Linux and requires using `type: af_packet`. Each process
# must be running in same network namespace. All processes must use the same
# interface settings. You must take responsibility for running multiple instances
# of Packetbeat.
#packetbeat.interfaces.fanout_group: ~

# Packetbeat automatically generates a BPF for capturing only the traffic on
# ports where it expects to find known protocols. Use this settings to tell
# Packetbeat to generate a BPF filter that accepts VLAN tags.
Expand Down

0 comments on commit 3eb1860

Please sign in to comment.