Skip to content

Commit

Permalink
Add back support for pcapng
Browse files Browse the repository at this point in the history
Signed-off-by: Chance Zibolski <chance.zibolski@gmail.com>
  • Loading branch information
chancez committed Jul 21, 2024
1 parent 2e83dfa commit 11e2e5b
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 176 deletions.
4 changes: 0 additions & 4 deletions cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,6 @@ func (g *gateway) updatePeerMetadata(peer serf.Member, update *capperpb.NodeMeta
}

func (g *gateway) CaptureQuery(req *capperpb.CaptureQueryRequest, stream capperpb.Querier_CaptureQueryServer) error {
if req.GetCaptureRequest().GetOutputFormat() == capperpb.PcapOutputFormat_OUTPUT_FORMAT_PCAPNG {
return status.Error(codes.InvalidArgument, "pcapng format is unsupported by query API")
}

ctx := stream.Context()
peers := g.getPeers()
var nodes []serf.Member
Expand Down
2 changes: 1 addition & 1 deletion cmd/local_capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func localCapture(ctx context.Context, log *slog.Logger, ifaces []string, netNam
defer handle.Close()
linkType := handle.LinkType()

handler := newCommonOutputHandler(linkType, uint32(conf.Snaplen), printPackets, outputPath, isDir)
handler := newCommonOutputHandler(linkType, uint32(conf.Snaplen), printPackets, outputPath, isDir, conf.OutputFormat)
defer handler.Flush()

err = handle.Start(ctx, handler)
Expand Down
118 changes: 88 additions & 30 deletions cmd/output_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ type commonOutputHandler struct {
handler capture.PacketHandler
}

func newCommonOutputHandler(linkType layers.LinkType, snaplen uint32, printPackets bool, outputPath string, isDir bool) *commonOutputHandler {
func newCommonOutputHandler(linkType layers.LinkType, snaplen uint32, printPackets bool, outputPath string, isDir bool, outputFormat capperpb.PcapOutputFormat) *commonOutputHandler {
var handlers []capture.PacketHandler
if printPackets {
handlers = append(handlers, capture.PacketPrinterHandler)
}
if outputPath != "" {
outputFileHandler := newOutputFileHandler(outputPath, isDir, linkType, snaplen)
outputFileHandler := newOutputFileHandler(outputPath, isDir, linkType, snaplen, outputFormat)
handlers = append(handlers, outputFileHandler)
}
handler := capture.ChainPacketHandlers(handlers...)
Expand All @@ -44,19 +44,30 @@ type outputFileHandler struct {
outputPath string
isDir bool

writers map[string]capture.PacketWriter
closers []io.Closer
linkType layers.LinkType
snaplen uint32
writers map[string]capture.PacketWriter
// map from nodeName -> network namespace -> interfaceName
closers []io.Closer
linkType layers.LinkType
snaplen uint32
outputFormat capperpb.PcapOutputFormat

// interfaceConfigured tracks if we've configured a given interface when
// using an NgWriter. The map is keyed by the interface index.
// Unfortunately, since pcapng didn't consider multi-host captures, this
// means the same interface index across different hosts may clash.
// TODO: Update NgWriter to key by more than the interface index.
interfaceConfigured map[int]struct{}
}

func newOutputFileHandler(outputPath string, isDir bool, linkType layers.LinkType, snaplen uint32) *outputFileHandler {
func newOutputFileHandler(outputPath string, isDir bool, linkType layers.LinkType, snaplen uint32, outputFormat capperpb.PcapOutputFormat) *outputFileHandler {
return &outputFileHandler{
outputPath: outputPath,
isDir: isDir,
writers: make(map[string]capture.PacketWriter),
linkType: linkType,
snaplen: snaplen,
outputPath: outputPath,
isDir: isDir,
writers: make(map[string]capture.PacketWriter),
linkType: linkType,
snaplen: snaplen,
outputFormat: outputFormat,
interfaceConfigured: make(map[int]struct{}),
}
}

Expand All @@ -65,32 +76,79 @@ func (h *outputFileHandler) HandlePacket(p gopacket.Packet) error {
if err != nil {
return fmt.Errorf("error getting packet ancillary data: %w", err)
}
identifier := normalizeFilename(ad.NodeName, ad.Netns, ad.IfaceName, capperpb.PcapOutputFormat_OUTPUT_FORMAT_PCAP)

var identifier string
if h.isDir {
identifier = normalizeFilename(ad.NodeName, ad.Netns, ad.IfaceName, h.outputFormat)
}

packetWriter, exists := h.writers[identifier]
if !exists {
var w io.Writer
if h.isDir {
f, err := os.Create(filepath.Join(h.outputPath, identifier))
if err != nil {
return fmt.Errorf("error opening output: %w", err)
}
h.closers = append(h.closers, f)
w = f
} else if h.outputPath == "-" {
w = os.Stdout
} else {
f, err := os.Create(h.outputPath)
if err != nil {
return fmt.Errorf("error opening output: %w", err)
packetWriter, err = h.newPacketWriter(identifier, p.Metadata().InterfaceIndex, ad)
if err != nil {
return err
}
h.writers[identifier] = packetWriter
} else {
// We already have a writer, check if we need to update it.
// If we're outputting to a directory, then we have a writer for each interface, so there's no need to update it,
// And we only need to add interfaces for pcapng format.
if !h.isDir && h.outputFormat == capperpb.PcapOutputFormat_OUTPUT_FORMAT_PCAPNG {
if _, configured := h.interfaceConfigured[p.Metadata().InterfaceIndex]; !configured {
ngWriter := packetWriter.(*capture.PcapNgWriter)
_, err := ngWriter.AddInterface(ad.IfaceName, p.Metadata().InterfaceIndex, layers.LinkType(ad.LinkType))
if err != nil {
return err
}
h.interfaceConfigured[p.Metadata().InterfaceIndex] = struct{}{}
}
h.closers = append(h.closers, f)
w = f
}
packetWriter = capture.NewPcapWriter(w, h.linkType, h.snaplen)
}
return packetWriter.WritePacket(p.Metadata().CaptureInfo, p.Data())
}

func (h *outputFileHandler) newPacketWriter(identifier string, interfaceIndex int, ad *capperpb.AncillaryPacketData) (capture.PacketWriter, error) {
var packetWriter capture.PacketWriter
var w io.Writer
if h.isDir {
f, err := os.Create(filepath.Join(h.outputPath, identifier))
if err != nil {
return nil, fmt.Errorf("error opening output: %w", err)
}
h.closers = append(h.closers, f)
w = f
} else if h.outputPath == "-" {
w = os.Stdout
} else {
f, err := os.Create(h.outputPath)
if err != nil {
return nil, fmt.Errorf("error opening output: %w", err)
}
h.closers = append(h.closers, f)
w = f
}

switch h.outputFormat {
case capperpb.PcapOutputFormat_OUTPUT_FORMAT_PCAPNG:
var err error
packetWriter, err = capture.NewPcapNgWriter(
w, h.linkType, h.snaplen,
ad.GetIfaceName(), interfaceIndex,
ad.GetHardware(), ad.GetOperatingSystem(),
ad.GetNodeName(),
)
if err != nil {
return nil, err
}
h.interfaceConfigured[interfaceIndex] = struct{}{}
case capperpb.PcapOutputFormat_OUTPUT_FORMAT_PCAP:
fallthrough
default:
packetWriter = capture.NewPcapWriter(w, h.linkType, h.snaplen)
}
return packetWriter, nil
}

func (h *outputFileHandler) Flush() error {
var err error
for _, w := range h.writers {
Expand Down
6 changes: 3 additions & 3 deletions cmd/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ func runQuery(cmd *cobra.Command, args []string) error {
BufferSize: int64(captureOpts.CaptureConfig.BufferSize),
},
}
return queryCapture(ctx, captureOpts.Logger, remoteOpts, req, captureOpts.OutputFile, captureOpts.AlwaysPrint)
return queryCapture(ctx, captureOpts.Logger, remoteOpts, req, captureOpts.OutputFile, captureOpts.AlwaysPrint, captureOpts.CaptureConfig.OutputFormat)
}

func queryCapture(ctx context.Context, log *slog.Logger, remoteOpts remoteOpts, req *capperpb.CaptureQueryRequest, outputPath string, alwaysPrint bool) error {
func queryCapture(ctx context.Context, log *slog.Logger, remoteOpts remoteOpts, req *capperpb.CaptureQueryRequest, outputPath string, alwaysPrint bool, outputFormat capperpb.PcapOutputFormat) error {
var isDir bool
if outputPath != "" {
fi, err := os.Stat(outputPath)
Expand Down Expand Up @@ -175,7 +175,7 @@ func queryCapture(ctx context.Context, log *slog.Logger, remoteOpts remoteOpts,
defer handle.Close()
linkType := handle.LinkType()

handler := newCommonOutputHandler(linkType, uint32(req.GetCaptureRequest().GetSnaplen()), printPackets, outputPath, isDir)
handler := newCommonOutputHandler(linkType, uint32(req.GetCaptureRequest().GetSnaplen()), printPackets, outputPath, isDir, outputFormat)
defer handler.Flush()

err = handle.Start(ctx, handler)
Expand Down
7 changes: 3 additions & 4 deletions cmd/remote_capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ func runRemoteCapture(cmd *cobra.Command, args []string) error {
K8SPodFilter: pod,
NoPromiscuousMode: !captureOpts.CaptureConfig.Promisc,
BufferSize: int64(captureOpts.CaptureConfig.BufferSize),
OutputFormat: captureOpts.CaptureConfig.OutputFormat,
}
return remoteCapture(ctx, captureOpts.Logger, remoteOpts, req, captureOpts.OutputFile, captureOpts.AlwaysPrint)
return remoteCapture(ctx, captureOpts.Logger, remoteOpts, req, captureOpts.OutputFile, captureOpts.AlwaysPrint, captureOpts.CaptureConfig.OutputFormat)
}

func remoteCapture(ctx context.Context, log *slog.Logger, remoteOpts remoteOpts, req *capperpb.CaptureRequest, outputPath string, alwaysPrint bool) error {
func remoteCapture(ctx context.Context, log *slog.Logger, remoteOpts remoteOpts, req *capperpb.CaptureRequest, outputPath string, alwaysPrint bool, outputFormat capperpb.PcapOutputFormat) error {
var isDir bool
if outputPath != "" {
fi, err := os.Stat(outputPath)
Expand Down Expand Up @@ -132,7 +131,7 @@ func remoteCapture(ctx context.Context, log *slog.Logger, remoteOpts remoteOpts,
defer handle.Close()
linkType := handle.LinkType()

handler := newCommonOutputHandler(linkType, uint32(req.GetSnaplen()), printPackets, outputPath, isDir)
handler := newCommonOutputHandler(linkType, uint32(req.GetSnaplen()), printPackets, outputPath, isDir, outputFormat)
defer handler.Flush()

err = handle.Start(ctx, handler)
Expand Down
9 changes: 2 additions & 7 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,6 @@ func (s *server) getPod(ctx context.Context, podName, namespace string) (*contai
}

func (s *server) Capture(req *capperpb.CaptureRequest, stream capperpb.Capper_CaptureServer) error {
if req.GetOutputFormat() == capperpb.PcapOutputFormat_OUTPUT_FORMAT_PCAPNG {
return status.Error(codes.InvalidArgument, "pcapng format is unsupported by capture API")
}

ctx := stream.Context()
var netns string
if req.GetK8SPodFilter() != nil {
Expand All @@ -284,7 +280,6 @@ func (s *server) Capture(req *capperpb.CaptureRequest, stream capperpb.Capper_Ca
Promisc: req.GetNoPromiscuousMode(),
NumPackets: req.GetNumPackets(),
CaptureDuration: req.GetDuration().AsDuration(),
OutputFormat: req.GetOutputFormat(),
}

return s.capture(ctx, req.GetInterface(), netns, conf, stream)
Expand Down Expand Up @@ -360,7 +355,7 @@ func (s *server) capture(ctx context.Context, ifaces []string, netns string, con
defer handle.Close()
linkType := handle.LinkType()

streamHandler, err := newStreamPacketHandler(linkType, uint32(conf.Snaplen), conf.OutputFormat, stream)
streamHandler, err := newStreamPacketHandler(linkType, uint32(conf.Snaplen), stream)
if err != nil {
return fmt.Errorf("failed to create stream packet handler: %w", err)
}
Expand All @@ -379,7 +374,7 @@ func (s *server) capture(ctx context.Context, ifaces []string, netns string, con

// newStreamPacketHandler returns a PacketHandler which writes the packets as
// bytes to the given Capper_CaptureServer stream.
func newStreamPacketHandler(linkType layers.LinkType, snaplen uint32, outputFormat capperpb.PcapOutputFormat, stream capperpb.Capper_CaptureServer) (capture.PacketHandler, error) {
func newStreamPacketHandler(linkType layers.LinkType, snaplen uint32, stream capperpb.Capper_CaptureServer) (capture.PacketHandler, error) {
streamHandler := capture.PacketHandlerFunc(func(p gopacket.Packet) error {
ad, err := getCapperAncillaryData(p)
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions pkg/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,13 @@ func (c *BasicCapture) Start(ctx context.Context, handler PacketHandler) error {
packetSource := gopacket.NewPacketSource(c.handle, c.handle.LinkType())
for packet := range packetSource.PacketsCtx(packetsCtx) {
packet.Metadata().AncillaryData = append(packet.Metadata().AncillaryData, &capperpb.AncillaryPacketData{
LinkType: int64(c.handle.LinkType()),
NodeName: c.iface.Hostname,
Netns: c.iface.Netns,
NetnsInode: c.iface.NetnsInode,
IfaceName: c.iface.Name,
LinkType: int64(c.handle.LinkType()),
NodeName: c.iface.Hostname,
Netns: c.iface.Netns,
NetnsInode: c.iface.NetnsInode,
IfaceName: c.iface.Name,
Hardware: runtime.GOARCH,
OperatingSystem: runtime.GOOS,
})
err := handler.HandlePacket(packet)
if err != nil {
Expand Down
56 changes: 56 additions & 0 deletions pkg/capture/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,59 @@ func (w *PcapWriter) WritePacket(ci gopacket.CaptureInfo, data []byte) error {
func (w *PcapWriter) Flush() error {
return nil
}

type PcapNgWriter struct {
ngWriter *pcapgo.NgWriter
linkType layers.LinkType
snaplen uint32
os string
hostname string
}

func NewPcapNgWriter(w io.Writer, linkType layers.LinkType, snaplen uint32, ifaceName string, ifaceIndex int, hardware, os, hostname string) (*PcapNgWriter, error) {
intf := pcapgo.NgInterface{
Name: ifaceName,
Index: ifaceIndex,
LinkType: linkType,
SnapLength: snaplen,
OS: os,
Description: fmt.Sprintf("hostname: %q", hostname),
}
ngOpts := pcapgo.NgWriterOptions{
SectionInfo: pcapgo.NgSectionInfo{
Hardware: hardware,
OS: os,
Application: "capper",
},
}
ngWriter, err := pcapgo.NewNgWriterInterface(w, intf, ngOpts)
if err != nil {
return nil, err
}
return &PcapNgWriter{
ngWriter: ngWriter,
linkType: linkType,
snaplen: snaplen,
os: os,
hostname: hostname,
}, nil
}

func (w *PcapNgWriter) WritePacket(ci gopacket.CaptureInfo, data []byte) error {
return w.ngWriter.WritePacket(ci, data)
}

func (w *PcapNgWriter) Flush() error {
return w.ngWriter.Flush()
}

func (w *PcapNgWriter) AddInterface(name string, index int, linkType layers.LinkType) (int, error) {
return w.ngWriter.AddInterface(pcapgo.NgInterface{
Name: name,
Index: index,
LinkType: linkType,
SnapLength: w.snaplen,
OS: w.os,
Description: fmt.Sprintf("hostname: %q", w.hostname),
})
}
Loading

0 comments on commit 11e2e5b

Please sign in to comment.