Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip not complete HTTP messages #953

Merged
merged 1 commit into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 48 additions & 12 deletions capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Listener struct {
sync.Mutex
Transport string // transport layer default to tcp
Activate func() error // function is used to activate the engine. it must be called before reading packets
Handles map[string]gopacket.ZeroCopyPacketDataSource
Handles map[string]packetHandle
Interfaces []pcap.Interface
loopIndex int
Reading chan bool // this channel is closed when the listener has started reading packets
Expand All @@ -57,6 +57,11 @@ type Listener struct {
quit chan struct{}
}

type packetHandle struct {
handler gopacket.ZeroCopyPacketDataSource
ips []net.IP
}

// EngineType ...
type EngineType uint8

Expand Down Expand Up @@ -117,7 +122,7 @@ func NewListener(host string, ports []uint16, transport string, engine EngineTyp
if transport != "" {
l.Transport = transport
}
l.Handles = make(map[string]gopacket.ZeroCopyPacketDataSource)
l.Handles = make(map[string]packetHandle)
l.trackResponse = trackResponse
l.closeDone = make(chan struct{})
l.quit = make(chan struct{})
Expand Down Expand Up @@ -312,12 +317,12 @@ func (l *Listener) read(handler PacketHandler) {
l.Lock()
defer l.Unlock()
for key, handle := range l.Handles {
go func(key string, hndl gopacket.ZeroCopyPacketDataSource) {
go func(key string, hndl packetHandle) {
defer l.closeHandles(key)
linkSize := 14
linkType := int(layers.LinkTypeEthernet)
if _, ok := hndl.(*pcap.Handle); ok {
linkType = int(hndl.(*pcap.Handle).LinkType())
if _, ok := hndl.handler.(*pcap.Handle); ok {
linkType = int(hndl.handler.(*pcap.Handle).LinkType())
linkSize, ok = pcapLinkTypeLength(linkType)
if !ok {
if os.Getenv("GORDEBUG") != "0" {
Expand All @@ -332,10 +337,22 @@ func (l *Listener) read(handler PacketHandler) {
case <-l.quit:
return
default:
data, ci, err := hndl.ZeroCopyReadPacketData()
data, ci, err := hndl.handler.ZeroCopyReadPacketData()
if err == nil {
pckt, err := tcp.ParsePacket(data, linkType, linkSize, &ci)
pckt, err := tcp.ParsePacket(data, linkType, linkSize, &ci, false)
if err == nil {
for _, p := range l.ports {
if pckt.DstPort == p {
for _, ip := range hndl.ips {
if pckt.DstIP.Equal(ip) {
pckt.Incoming = true
break
}
}
break
}
}

handler(pckt)
}
continue
Expand Down Expand Up @@ -367,7 +384,7 @@ func (l *Listener) closeHandles(key string) {
l.Lock()
defer l.Unlock()
if handle, ok := l.Handles[key]; ok {
if c, ok := handle.(io.Closer); ok {
if c, ok := handle.handler.(io.Closer); ok {
c.Close()
}

Expand All @@ -388,7 +405,10 @@ func (l *Listener) activatePcap() error {
msg += ("\n" + e.Error())
continue
}
l.Handles[ifi.Name] = handle
l.Handles[ifi.Name] = packetHandle{
handler: handle,
ips: interfaceIPs(ifi),
}
}
if len(l.Handles) == 0 {
return fmt.Errorf("pcap handles error:%s", msg)
Expand All @@ -409,7 +429,10 @@ func (l *Listener) activateRawSocket() error {
msg += ("\n" + e.Error())
continue
}
l.Handles[ifi.Name] = handle
l.Handles[ifi.Name] = packetHandle{
handler: handle,
ips: interfaceIPs(ifi),
}
}
if len(l.Handles) == 0 {
return fmt.Errorf("raw socket handles error:%s", msg)
Expand All @@ -433,7 +456,9 @@ func (l *Listener) activatePcapFile() (err error) {
handle.Close()
return fmt.Errorf("BPF filter error: %q, filter: %s", e, l.BPFFilter)
}
l.Handles["pcap_file"] = handle
l.Handles["pcap_file"] = packetHandle{
handler: handle,
}
return
}

Expand All @@ -456,7 +481,10 @@ func (l *Listener) activateAFPacket() error {
fmt.Println("Interface:", ifi.Name, ". BPF Filter:", l.BPFFilter)
handle.SetBPFFilter(l.BPFFilter, 64<<10)

l.Handles[ifi.Name] = handle
l.Handles[ifi.Name] = packetHandle{
handler: handle,
ips: interfaceIPs(ifi),
}
}

if len(l.Handles) == 0 {
Expand Down Expand Up @@ -524,6 +552,14 @@ func interfaceAddresses(ifi pcap.Interface) []string {
return hosts
}

func interfaceIPs(ifi pcap.Interface) []net.IP {
var ips []net.IP
for _, addr := range ifi.Addresses {
ips = append(ips, addr.IP)
}
return ips
}

func listenAll(addr string) bool {
switch addr {
case "", "0.0.0.0", "[::]", "::":
Expand Down
26 changes: 13 additions & 13 deletions input_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,17 @@ func (protocol *TCPProtocol) String() string {
// RAWInputConfig represents configuration that can be applied on raw input
type RAWInputConfig struct {
capture.PcapOptions
Expire time.Duration `json:"input-raw-expire"`
CopyBufferSize size.Size `json:"copy-buffer-size"`
Engine capture.EngineType `json:"input-raw-engine"`
TrackResponse bool `json:"input-raw-track-response"`
Protocol TCPProtocol `json:"input-raw-protocol"`
RealIPHeader string `json:"input-raw-realip-header"`
Stats bool `json:"input-raw-stats"`
quit chan bool // Channel used only to indicate goroutine should shutdown
host string
ports []uint16
Expire time.Duration `json:"input-raw-expire"`
CopyBufferSize size.Size `json:"copy-buffer-size"`
Engine capture.EngineType `json:"input-raw-engine"`
TrackResponse bool `json:"input-raw-track-response"`
Protocol TCPProtocol `json:"input-raw-protocol"`
RealIPHeader string `json:"input-raw-realip-header"`
Stats bool `json:"input-raw-stats"`
AllowIncomplete bool `json:"input-raw-allow-incomplete"`
quit chan bool // Channel used only to indicate goroutine should shutdown
host string
ports []uint16
}

// RAWInput used for intercepting traffic for given address
Expand Down Expand Up @@ -116,8 +117,7 @@ func (i *RAWInput) PluginRead() (*Message, error) {
select {
case <-i.quit:
return nil, ErrorStopped
default:
msgTCP = i.messageParser.Read()
case msgTCP = <-i.messageParser.Messages():
msg.Data = msgTCP.Data()
}

Expand Down Expand Up @@ -158,7 +158,7 @@ func (i *RAWInput) listen(address string) {
if err != nil {
log.Fatal(err)
}
i.messageParser = tcp.NewMessageParser(i.CopyBufferSize, i.Expire, Debug)
i.messageParser = tcp.NewMessageParser(i.CopyBufferSize, i.Expire, i.AllowIncomplete, Debug)

if i.Protocol == ProtocolHTTP {
i.messageParser.Start = http1StartHint
Expand Down
5 changes: 5 additions & 0 deletions input_raw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestRAWInputIPv4(t *testing.T) {
} else {
respCounter++
}

wg.Done()
})

Expand All @@ -71,14 +72,18 @@ func TestRAWInputIPv4(t *testing.T) {
emitter := NewEmitter()
defer emitter.Close()
go emitter.Start(plugins, Settings.Middleware)

// time.Sleep(time.Second)
for i := 0; i < 1; i++ {
wg.Add(2)
_, err = http.Get(addr)

if err != nil {
t.Error(err)
return
}
}

wg.Wait()
const want = 10
if reqCounter != respCounter && reqCounter != want {
Expand Down
1 change: 1 addition & 0 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func init() {
flag.BoolVar(&Settings.Promiscuous, "input-raw-promisc", false, "enable promiscuous mode")
flag.BoolVar(&Settings.Monitor, "input-raw-monitor", false, "enable RF monitor mode")
flag.BoolVar(&Settings.Stats, "input-raw-stats", false, "enable stats generator on raw TCP messages")
flag.BoolVar(&Settings.AllowIncomplete, "input-raw-allow-incomplete", false, "If turned on Gor will record HTTP messages with missing packets")

flag.StringVar(&Settings.Middleware, "middleware", "", "Used for modifying traffic using external command")

Expand Down
58 changes: 37 additions & 21 deletions tcp/tcp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,23 +184,27 @@ type MessageParser struct {
maxSize size.Size // maximum message size, default 5mb
m map[uint64]*Message

messageExpire time.Duration // the maximum time to wait for the final packet, minimum is 100ms
End HintEnd
Start HintStart
ticker *time.Ticker
messages chan *Message
packets chan *Packet
close chan struct{} // to signal that we are able to close
messageExpire time.Duration // the maximum time to wait for the final packet, minimum is 100ms
allowIncompete bool
End HintEnd
Start HintStart
ticker *time.Ticker
messages chan *Message
packets chan *Packet
close chan struct{} // to signal that we are able to close
}

// NewMessageParser returns a new instance of message parser
func NewMessageParser(maxSize size.Size, messageExpire time.Duration, debugger Debugger) (parser *MessageParser) {
func NewMessageParser(maxSize size.Size, messageExpire time.Duration, allowIncompete bool, debugger Debugger) (parser *MessageParser) {
parser = new(MessageParser)
parser.debug = debugger
parser.messageExpire = time.Millisecond * 100
if parser.messageExpire < messageExpire {
parser.messageExpire = messageExpire

parser.messageExpire = messageExpire
if parser.messageExpire == 0 {
parser.messageExpire = time.Millisecond * 500
}

parser.allowIncompete = allowIncompete
parser.maxSize = maxSize
if parser.maxSize < 1 {
parser.maxSize = 5 << 20
Expand All @@ -224,8 +228,6 @@ func (parser *MessageParser) PacketHandler(packet *Packet) {
parser.packets <- packet
}

var processedPackets int

func (parser *MessageParser) wait() {
var (
now time.Time
Expand Down Expand Up @@ -271,6 +273,8 @@ func (parser *MessageParser) processPacket(pckt *Packet) {
}
return
}
default:
in = pckt.Incoming
}

m = new(Message)
Expand All @@ -288,23 +292,29 @@ func (parser *MessageParser) addPacket(m *Message, pckt *Packet) {
pckt.Payload = pckt.Payload[:int(parser.maxSize)-m.Length]
}
m.add(pckt)
switch {
// if one of this cases matches, we dispatch the message
case trunc >= 0:
case parser.End != nil && parser.End(m):
default:
// continue to receive packets

if trunc > 0 {
return
}

parser.Emit(m)
// If we are using protocol parsing, like HTTP, depend on its parsing func.
// For the binary procols wait for message to expire
if parser.End != nil {
if parser.End(m) {
parser.Emit(m)
}
}
}

func (parser *MessageParser) Read() *Message {
m := <-parser.messages
return m
}

func (parser *MessageParser) Messages() chan *Message {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method MessageParser.Messages should have comment or be unexported

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method MessageParser.Messages should have comment or be unexported

return parser.messages
}

func (parser *MessageParser) Emit(m *Message) {
delete(parser.m, m.packets[0].MessageID())

Expand All @@ -321,7 +331,13 @@ func (parser *MessageParser) timer(now time.Time) {
for _, m := range parser.m {
if now.Sub(m.End) > parser.messageExpire {
m.TimedOut = true
parser.Emit(m)
if parser.End == nil || parser.allowIncompete {
parser.Emit(m)
} else {
// Just remove
delete(parser.m, m.packets[0].MessageID())
m.Finalize()
}
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions tcp/tcp_packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ calllers must make sure that ParsePacket has'nt returned any error before callin
function.
*/
type Packet struct {
Incoming bool
messageID uint64
SrcIP, DstIP net.IP
Version uint8
Expand All @@ -72,17 +73,17 @@ type Packet struct {
}

// ParsePacket parse raw packets
func ParsePacket(data []byte, lType, lTypeLen int, cp *gopacket.CaptureInfo) (pckt *Packet, err error) {
func ParsePacket(data []byte, lType, lTypeLen int, cp *gopacket.CaptureInfo, allowEmpty bool) (pckt *Packet, err error) {
pckt = packetPool.Get()
if err := pckt.parse(data, lType, lTypeLen, cp); err != nil {
if err := pckt.parse(data, lType, lTypeLen, cp, allowEmpty); err != nil {
packetPool.Put(pckt)
return nil, err
}

return pckt, nil
}

func (pckt *Packet) parse(data []byte, lType, lTypeLen int, cp *gopacket.CaptureInfo) error {
func (pckt *Packet) parse(data []byte, lType, lTypeLen int, cp *gopacket.CaptureInfo, allowEmpty bool) error {
pckt.Retry = 0
pckt.messageID = 0

Expand Down Expand Up @@ -158,7 +159,7 @@ func (pckt *Packet) parse(data []byte, lType, lTypeLen int, cp *gopacket.Capture
return ErrHdrLength("TCP opts")
}

if len(ndata[dOf:]) == 0 {
if !allowEmpty && len(ndata[dOf:]) == 0 {
return EmptyPacket("")
}

Expand Down
Loading