Skip to content

Commit

Permalink
Merge pull request #2470 from tsg/packetbeat_large_messages
Browse files Browse the repository at this point in the history
Improve handling of messages larger than 10MB
  • Loading branch information
monicasarbu authored Sep 12, 2016
2 parents 51ba275 + e2754c0 commit df0673d
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 28 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d

*Packetbeat*
- Fix mapping for some Packetbeat flow metrics that were not marked as being longs. {issue}2177[2177]
- Fix handling of messages larger than the maximum message size (10MB). {pull}2470[2470]

*Topbeat*

Expand Down Expand Up @@ -104,6 +105,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
- Add cassandra protocol analyzer to packetbeat. {pull}1959[1959]
- Match connections with IPv6 addresses to processes {pull}2254[2254]
- Add IP address to -devices command output {pull}2327[2327]
- Add configuration option for the maximum message size. Used to be hard-coded to 10 MB. {pull}2470[2470]

*Topbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,12 @@ information. If this header is present and contains a valid IP addresses, the
information is used for the `real_ip` and `client_location` indexed
fields.

===== max_message_size

If an individual HTTP message is larger than this setting (in bytes), it will be trimmed
to this size. Unless this value is very small (<1.5K), Packetbeat is able to still correctly
follow the transaction and create an event for it. The default is 10485760 (10 MB).


==== AMQP Configuration Options

Expand Down
4 changes: 4 additions & 0 deletions packetbeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ packetbeat.protocols.http:
# incoming responses, but sent to Elasticsearch immediately.
#transaction_timeout: 10s

# Maximum message size. If an HTTP message is larger than this, it will
# be trimmed to this size. Default is 10 MB.
#max_message_size: 10485760

packetbeat.protocols.memcache:
# Enable memcache monitoring. Default: true
#enabled: true
Expand Down
4 changes: 4 additions & 0 deletions packetbeat/packetbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ packetbeat.protocols.http:
# incoming responses, but sent to Elasticsearch immediately.
#transaction_timeout: 10s

# Maximum message size. If an HTTP message is larger than this, it will
# be trimmed to this size. Default is 10 MB.
#max_message_size: 10485760

packetbeat.protocols.memcache:
# Enable memcache monitoring. Default: true
#enabled: true
Expand Down
3 changes: 3 additions & 0 deletions packetbeat/protos/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
)

type httpConfig struct {
Expand All @@ -14,12 +15,14 @@ type httpConfig struct {
Include_body_for []string `config:"include_body_for"`
Hide_keywords []string `config:"hide_keywords"`
Redact_authorization bool `config:"redact_authorization"`
MaxMessageSize int `config:"max_message_size"`
}

var (
defaultConfig = httpConfig{
ProtocolCommon: config.ProtocolCommon{
TransactionTimeout: protos.DefaultTransactionExpiration,
},
MaxMessageSize: tcp.TCP_MAX_DATA_IN_STREAM,
}
)
19 changes: 12 additions & 7 deletions packetbeat/protos/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type HTTP struct {
HideKeywords []string
RedactAuthorization bool
IncludeBodyFor []string
MaxMessageSize int

parserConfig parserConfig

Expand Down Expand Up @@ -124,6 +125,7 @@ func (http *HTTP) setFromConfig(config *httpConfig) {
http.parserConfig.RealIPHeader = strings.ToLower(config.Real_ip_header)
http.transactionTimeout = config.TransactionTimeout
http.IncludeBodyFor = config.Include_body_for
http.MaxMessageSize = config.MaxMessageSize

if config.Send_all_headers {
http.parserConfig.SendHeaders = true
Expand Down Expand Up @@ -265,19 +267,21 @@ func (http *HTTP) doParse(
detailedf("Payload received: [%s]", pkt.Payload)
}

extraMsgSize := 0 // size of a "seen" packet for which we don't store the actual bytes

st := conn.Streams[dir]
if st == nil {
st = newStream(pkt, tcptuple)
conn.Streams[dir] = st
} else {
// concatenate bytes
st.data = append(st.data, pkt.Payload...)
if len(st.data) > tcp.TCP_MAX_DATA_IN_STREAM {
if len(st.data)+len(pkt.Payload) > http.MaxMessageSize {
if isDebug {
debugf("Stream data too large, dropping TCP stream")
debugf("Stream data too large, ignoring message")
}
conn.Streams[dir] = nil
return conn
extraMsgSize = len(pkt.Payload)
} else {
st.data = append(st.data, pkt.Payload...)
}
}

Expand All @@ -287,7 +291,7 @@ func (http *HTTP) doParse(
}

parser := newParser(&http.parserConfig)
ok, complete := parser.parse(st)
ok, complete := parser.parse(st, extraMsgSize)
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
Expand Down Expand Up @@ -322,6 +326,7 @@ func newStream(pkt *protos.Packet, tcptuple *common.TcpTuple) *stream {
func (http *HTTP) ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {

debugf("Received FIN")
conn := getHTTPConnection(private)
if conn == nil {
return private
Expand Down Expand Up @@ -583,7 +588,7 @@ func parseCookieValue(raw string) string {
func (http *HTTP) extractBody(m *message) []byte {
body := []byte{}

if len(m.ContentType) == 0 || http.shouldIncludeInBody(m.ContentType) {
if len(m.ContentType) > 0 && http.shouldIncludeInBody(m.ContentType) {
if len(m.chunkedBody) > 0 {
body = append(body, m.chunkedBody...)
} else {
Expand Down
53 changes: 46 additions & 7 deletions packetbeat/protos/http/http_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type message struct {
Ts time.Time
hasContentLength bool
headerOffset int
bodyOffset int
version version
connection common.NetString
chunkedLength int
Expand Down Expand Up @@ -46,9 +45,10 @@ type message struct {

Notes []string

//Timing
start int
end int
//Offsets
start int
end int
bodyOffset int

next *message
}
Expand Down Expand Up @@ -87,9 +87,18 @@ func newParser(config *parserConfig) *parser {
return &parser{config: config}
}

func (parser *parser) parse(s *stream) (bool, bool) {
func (parser *parser) parse(s *stream, extraMsgSize int) (bool, bool) {
m := s.message

if extraMsgSize > 0 {
// A packet of extraMsgSize size was seen, but we don't have
// its actual bytes. This is only usable in the `stateBody` state.
if s.parseState != stateBody {
return false, false
}
return parser.eatBody(s, m, extraMsgSize)
}

for s.parseOffset < len(s.data) {
switch s.parseState {
case stateStart:
Expand Down Expand Up @@ -363,14 +372,14 @@ func (parser *parser) parseHeader(m *message, data []byte) (bool, bool, int) {

func (*parser) parseBody(s *stream, m *message) (ok, complete bool) {
if isDebug {
debugf("eat body: %d", s.parseOffset)
debugf("parseBody body: %d", s.parseOffset)
}
if !m.hasContentLength && (bytes.Equal(m.connection, constClose) ||
(isVersion(m.version, 1, 0) && !bytes.Equal(m.connection, constKeepAlive))) {

// HTTP/1.0 no content length. Add until the end of the connection
if isDebug {
debugf("close connection, %d", len(s.data)-s.parseOffset)
debugf("http conn close, received %d", len(s.data)-s.parseOffset)
}
s.bodyReceived += (len(s.data) - s.parseOffset)
m.ContentLength += (len(s.data) - s.parseOffset)
Expand All @@ -391,6 +400,36 @@ func (*parser) parseBody(s *stream, m *message) (ok, complete bool) {
}
}

// eatBody acts as if size bytes were received, without having access to
// those bytes.
func (*parser) eatBody(s *stream, m *message, size int) (ok, complete bool) {
if isDebug {
debugf("eatBody body: %d", s.parseOffset)
}
if !m.hasContentLength && (bytes.Equal(m.connection, constClose) ||
(isVersion(m.version, 1, 0) && !bytes.Equal(m.connection, constKeepAlive))) {

// HTTP/1.0 no content length. Add until the end of the connection
if isDebug {
debugf("http conn close, received %d", size)
}
s.bodyReceived += size
m.ContentLength += size
return true, false
} else if size >= m.ContentLength-s.bodyReceived {
s.bodyReceived += (m.ContentLength - s.bodyReceived)
m.end = s.parseOffset
m.Size = uint64(m.bodyOffset-m.start) + uint64(m.ContentLength)
return true, true
} else {
s.bodyReceived += size
if isDebug {
debugf("bodyReceived: %d", s.bodyReceived)
}
return true, false
}
}

func (*parser) parseBodyChunkedStart(s *stream, m *message) (cont, ok, complete bool) {
// read hexa length
i := bytes.Index(s.data[s.parseOffset:], constCRLF)
Expand Down
Loading

0 comments on commit df0673d

Please sign in to comment.