diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 34057d162f5a..6d3746b9a456 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -53,6 +53,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d *Packetbeat* - Fix mapping for some Packetbeat flow metrics that were not marked as being longs. {issue}2177[2177] +- Fix handling of messages larger than the maximum message size (10MB). {pull}2470[2470] *Topbeat* @@ -104,6 +105,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d - Add cassandra protocol analyzer to packetbeat. {pull}1959[1959] - Match connections with IPv6 addresses to processes {pull}2254[2254] - Add IP address to -devices command output {pull}2327[2327] +- Add configuration option for the maximum message size. Used to be hard-coded to 10 MB. {pull}2470[2470] *Topbeat* diff --git a/packetbeat/docs/reference/configuration/packetbeat-options.asciidoc b/packetbeat/docs/reference/configuration/packetbeat-options.asciidoc index cef30b2e73bb..f774204d7f85 100644 --- a/packetbeat/docs/reference/configuration/packetbeat-options.asciidoc +++ b/packetbeat/docs/reference/configuration/packetbeat-options.asciidoc @@ -501,6 +501,12 @@ information. If this header is present and contains a valid IP addresses, the information is used for the `real_ip` and `client_location` indexed fields. +===== max_message_size + +If an individual HTTP message is larger than this setting (in bytes), it will be trimmed +to this size. Unless this value is very small (<1.5K), Packetbeat is able to still correctly +follow the transaction and create an event for it. The default is 10485760 (10 MB). + ==== AMQP Configuration Options diff --git a/packetbeat/etc/beat.full.yml b/packetbeat/etc/beat.full.yml index 079e4414bdce..32f421e8b329 100644 --- a/packetbeat/etc/beat.full.yml +++ b/packetbeat/etc/beat.full.yml @@ -205,6 +205,10 @@ packetbeat.protocols.http: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Maximum message size. If an HTTP message is larger than this, it will + # be trimmed to this size. Default is 10 MB. + #max_message_size: 10485760 + packetbeat.protocols.memcache: # Enable memcache monitoring. Default: true #enabled: true diff --git a/packetbeat/packetbeat.full.yml b/packetbeat/packetbeat.full.yml index 14910b624367..e1928247b5c4 100644 --- a/packetbeat/packetbeat.full.yml +++ b/packetbeat/packetbeat.full.yml @@ -205,6 +205,10 @@ packetbeat.protocols.http: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Maximum message size. If an HTTP message is larger than this, it will + # be trimmed to this size. Default is 10 MB. + #max_message_size: 10485760 + packetbeat.protocols.memcache: # Enable memcache monitoring. Default: true #enabled: true diff --git a/packetbeat/protos/http/config.go b/packetbeat/protos/http/config.go index 71a256a33c04..fa1973bce385 100644 --- a/packetbeat/protos/http/config.go +++ b/packetbeat/protos/http/config.go @@ -3,6 +3,7 @@ package http import ( "github.com/elastic/beats/packetbeat/config" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/protos/tcp" ) type httpConfig struct { @@ -14,6 +15,7 @@ type httpConfig struct { Include_body_for []string `config:"include_body_for"` Hide_keywords []string `config:"hide_keywords"` Redact_authorization bool `config:"redact_authorization"` + MaxMessageSize int `config:"max_message_size"` } var ( @@ -21,5 +23,6 @@ var ( ProtocolCommon: config.ProtocolCommon{ TransactionTimeout: protos.DefaultTransactionExpiration, }, + MaxMessageSize: tcp.TCP_MAX_DATA_IN_STREAM, } ) diff --git a/packetbeat/protos/http/http.go b/packetbeat/protos/http/http.go index cfab0d9e7b16..13736f7752d4 100644 --- a/packetbeat/protos/http/http.go +++ b/packetbeat/protos/http/http.go @@ -68,6 +68,7 @@ type HTTP struct { HideKeywords []string RedactAuthorization bool IncludeBodyFor []string + MaxMessageSize int parserConfig parserConfig @@ -124,6 +125,7 @@ func (http *HTTP) setFromConfig(config *httpConfig) { http.parserConfig.RealIPHeader = strings.ToLower(config.Real_ip_header) http.transactionTimeout = config.TransactionTimeout http.IncludeBodyFor = config.Include_body_for + http.MaxMessageSize = config.MaxMessageSize if config.Send_all_headers { http.parserConfig.SendHeaders = true @@ -265,19 +267,21 @@ func (http *HTTP) doParse( detailedf("Payload received: [%s]", pkt.Payload) } + extraMsgSize := 0 // size of a "seen" packet for which we don't store the actual bytes + st := conn.Streams[dir] if st == nil { st = newStream(pkt, tcptuple) conn.Streams[dir] = st } else { // concatenate bytes - st.data = append(st.data, pkt.Payload...) - if len(st.data) > tcp.TCP_MAX_DATA_IN_STREAM { + if len(st.data)+len(pkt.Payload) > http.MaxMessageSize { if isDebug { - debugf("Stream data too large, dropping TCP stream") + debugf("Stream data too large, ignoring message") } - conn.Streams[dir] = nil - return conn + extraMsgSize = len(pkt.Payload) + } else { + st.data = append(st.data, pkt.Payload...) } } @@ -287,7 +291,7 @@ func (http *HTTP) doParse( } parser := newParser(&http.parserConfig) - ok, complete := parser.parse(st) + ok, complete := parser.parse(st, extraMsgSize) if !ok { // drop this tcp stream. Will retry parsing with the next // segment in it @@ -322,6 +326,7 @@ func newStream(pkt *protos.Packet, tcptuple *common.TcpTuple) *stream { func (http *HTTP) ReceivedFin(tcptuple *common.TcpTuple, dir uint8, private protos.ProtocolData) protos.ProtocolData { + debugf("Received FIN") conn := getHTTPConnection(private) if conn == nil { return private @@ -583,7 +588,7 @@ func parseCookieValue(raw string) string { func (http *HTTP) extractBody(m *message) []byte { body := []byte{} - if len(m.ContentType) == 0 || http.shouldIncludeInBody(m.ContentType) { + if len(m.ContentType) > 0 && http.shouldIncludeInBody(m.ContentType) { if len(m.chunkedBody) > 0 { body = append(body, m.chunkedBody...) } else { diff --git a/packetbeat/protos/http/http_parser.go b/packetbeat/protos/http/http_parser.go index 4f9021a8c463..f48bf692d966 100644 --- a/packetbeat/protos/http/http_parser.go +++ b/packetbeat/protos/http/http_parser.go @@ -16,7 +16,6 @@ type message struct { Ts time.Time hasContentLength bool headerOffset int - bodyOffset int version version connection common.NetString chunkedLength int @@ -46,9 +45,10 @@ type message struct { Notes []string - //Timing - start int - end int + //Offsets + start int + end int + bodyOffset int next *message } @@ -87,9 +87,18 @@ func newParser(config *parserConfig) *parser { return &parser{config: config} } -func (parser *parser) parse(s *stream) (bool, bool) { +func (parser *parser) parse(s *stream, extraMsgSize int) (bool, bool) { m := s.message + if extraMsgSize > 0 { + // A packet of extraMsgSize size was seen, but we don't have + // its actual bytes. This is only usable in the `stateBody` state. + if s.parseState != stateBody { + return false, false + } + return parser.eatBody(s, m, extraMsgSize) + } + for s.parseOffset < len(s.data) { switch s.parseState { case stateStart: @@ -363,14 +372,14 @@ func (parser *parser) parseHeader(m *message, data []byte) (bool, bool, int) { func (*parser) parseBody(s *stream, m *message) (ok, complete bool) { if isDebug { - debugf("eat body: %d", s.parseOffset) + debugf("parseBody body: %d", s.parseOffset) } if !m.hasContentLength && (bytes.Equal(m.connection, constClose) || (isVersion(m.version, 1, 0) && !bytes.Equal(m.connection, constKeepAlive))) { // HTTP/1.0 no content length. Add until the end of the connection if isDebug { - debugf("close connection, %d", len(s.data)-s.parseOffset) + debugf("http conn close, received %d", len(s.data)-s.parseOffset) } s.bodyReceived += (len(s.data) - s.parseOffset) m.ContentLength += (len(s.data) - s.parseOffset) @@ -391,6 +400,36 @@ func (*parser) parseBody(s *stream, m *message) (ok, complete bool) { } } +// eatBody acts as if size bytes were received, without having access to +// those bytes. +func (*parser) eatBody(s *stream, m *message, size int) (ok, complete bool) { + if isDebug { + debugf("eatBody body: %d", s.parseOffset) + } + if !m.hasContentLength && (bytes.Equal(m.connection, constClose) || + (isVersion(m.version, 1, 0) && !bytes.Equal(m.connection, constKeepAlive))) { + + // HTTP/1.0 no content length. Add until the end of the connection + if isDebug { + debugf("http conn close, received %d", size) + } + s.bodyReceived += size + m.ContentLength += size + return true, false + } else if size >= m.ContentLength-s.bodyReceived { + s.bodyReceived += (m.ContentLength - s.bodyReceived) + m.end = s.parseOffset + m.Size = uint64(m.bodyOffset-m.start) + uint64(m.ContentLength) + return true, true + } else { + s.bodyReceived += size + if isDebug { + debugf("bodyReceived: %d", s.bodyReceived) + } + return true, false + } +} + func (*parser) parseBodyChunkedStart(s *stream, m *message) (cont, ok, complete bool) { // read hexa length i := bytes.Index(s.data[s.parseOffset:], constCRLF) diff --git a/packetbeat/protos/http/http_test.go b/packetbeat/protos/http/http_test.go index 1a164fc75e54..8168a6ea4407 100644 --- a/packetbeat/protos/http/http_test.go +++ b/packetbeat/protos/http/http_test.go @@ -45,7 +45,7 @@ func (tp *testParser) parse() (*message, bool, bool) { } parser := newParser(&tp.http.parserConfig) - ok, complete := parser.parse(st) + ok, complete := parser.parse(st, 0) return st.message, ok, complete } @@ -63,9 +63,9 @@ func testParse(http *HTTP, data string) (*message, bool, bool) { return tp.parse() } -func testParseStream(http *HTTP, st *stream) (bool, bool) { +func testParseStream(http *HTTP, st *stream, extraLen int) (bool, bool) { parser := newParser(&http.parserConfig) - return parser.parse(st) + return parser.parse(st, extraLen) } func TestHttpParser_simpleResponse(t *testing.T) { @@ -165,8 +165,80 @@ func TestHttpParser_Request_ContentLength_0(t *testing.T) { assert.True(t, complete) } +func TestHttpParser_eatBody(t *testing.T) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"http", "httpdetailed"}) + } + + http := httpModForTests() + http.parserConfig.SendHeaders = true + http.parserConfig.SendAllHeaders = true + + data := []byte("POST / HTTP/1.1\r\n" + + "user-agent: curl/7.35.0\r\n" + + "host: localhost:9000\r\n" + + "accept: */*\r\n" + + "authorization: Company 1\r\n" + + "content-length: 20\r\n" + + "connection: close\r\n" + + "\r\n" + + "0123456789") + + st := &stream{data: data, message: new(message)} + ok, complete := testParseStream(http, st, 0) + assert.True(t, ok) + assert.False(t, complete) + assert.Equal(t, st.bodyReceived, 10) + + ok, complete = testParseStream(http, st, 5) + assert.True(t, ok) + assert.False(t, complete) + assert.Equal(t, st.bodyReceived, 15) + + ok, complete = testParseStream(http, st, 5) + assert.True(t, ok) + assert.True(t, complete) + assert.Equal(t, st.bodyReceived, 20) + assert.Equal(t, st.message.end, len(data)) +} + +func TestHttpParser_eatBody_connclose(t *testing.T) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"http", "httpdetailed"}) + } + + http := httpModForTests() + http.parserConfig.SendHeaders = true + http.parserConfig.SendAllHeaders = true + + data := []byte("HTTP/1.1 200 ok\r\n" + + "user-agent: curl/7.35.0\r\n" + + "host: localhost:9000\r\n" + + "accept: */*\r\n" + + "authorization: Company 1\r\n" + + "connection: close\r\n" + + "\r\n" + + "0123456789") + + st := &stream{data: data, message: new(message)} + ok, complete := testParseStream(http, st, 0) + assert.True(t, ok) + assert.False(t, complete) + assert.Equal(t, st.bodyReceived, 10) + + ok, complete = testParseStream(http, st, 5) + assert.True(t, ok) + assert.False(t, complete) + assert.Equal(t, st.bodyReceived, 15) + + ok, complete = testParseStream(http, st, 5) + assert.True(t, ok) + assert.False(t, complete) + assert.Equal(t, st.bodyReceived, 20) +} + func TestHttpParser_splitResponse(t *testing.T) { - data1 := "HTTP/1.1 200 OK\r\n" + + data1 := "HTTP/1.1 200 ok\r\n" + "Date: Tue, 14 Aug 2012 22:31:45 GMT\r\n" + "Expires: -1\r\n" + "Cache-Control: private, max-age=0\r\n" + @@ -672,7 +744,7 @@ func TestHttpParser_censorPasswordGET(t *testing.T) { st := &stream{data: data1, message: new(message)} - ok, complete := testParseStream(http, st) + ok, complete := testParseStream(http, st, 0) if !ok { t.Errorf("Parsing returned error") } @@ -722,7 +794,7 @@ func TestHttpParser_RedactAuthorization(t *testing.T) { st := &stream{data: data, message: new(message)} - ok, _ := testParseStream(http, st) + ok, _ := testParseStream(http, st, 0) st.message.Raw = st.data[st.message.start:] http.hideHeaders(st.message) @@ -759,7 +831,7 @@ func TestHttpParser_RedactAuthorization_raw(t *testing.T) { st := &stream{data: data, message: new(message)} - ok, complete := testParseStream(http, st) + ok, complete := testParseStream(http, st, 0) st.message.Raw = st.data[st.message.start:] http.hideHeaders(st.message) @@ -796,7 +868,7 @@ func TestHttpParser_RedactAuthorization_Proxy_raw(t *testing.T) { st := &stream{data: data, message: new(message)} - ok, complete := testParseStream(http, st) + ok, complete := testParseStream(http, st, 0) st.message.Raw = st.data[st.message.start:] http.hideHeaders(st.message) @@ -905,7 +977,7 @@ func Test_gap_in_headers(t *testing.T) { "Content-Type: text/html; charset=UTF-8\r\n") st := &stream{data: data1, message: new(message)} - ok, complete := testParseStream(http, st) + ok, complete := testParseStream(http, st, 0) assert.Equal(t, true, ok) assert.Equal(t, false, complete) @@ -934,7 +1006,7 @@ func Test_gap_in_body(t *testing.T) { "xxxxxxxxxxxxxxxxxxxx") st := &stream{data: data1, message: new(message)} - ok, complete := testParseStream(http, st) + ok, complete := testParseStream(http, st, 0) assert.Equal(t, true, ok) assert.Equal(t, false, complete) @@ -966,7 +1038,7 @@ func Test_gap_in_body_http1dot0(t *testing.T) { "xxxxxxxxxxxxxxxxxxxx") st := &stream{data: data1, message: new(message)} - ok, complete := testParseStream(http, st) + ok, complete := testParseStream(http, st, 0) assert.Equal(t, true, ok) assert.Equal(t, false, complete) @@ -1100,7 +1172,7 @@ func benchmarkHTTPMessage(b *testing.B, data []byte) { for i := 0; i < b.N; i++ { stream := &stream{data: data, message: new(message)} - ok, complete := parser.parse(stream) + ok, complete := parser.parse(stream, 0) if !ok || !complete { b.Errorf("failed to parse message") } @@ -1159,13 +1231,13 @@ func BenchmarkHTTPSplitResponse(b *testing.B) { for i := 0; i < b.N; i++ { stream := &stream{data: data1, message: new(message)} - ok, complete := parser.parse(stream) + ok, complete := parser.parse(stream, 0) if !ok || complete { b.Errorf("parse failure. Expected message to be incomplete, but no parse failures") } stream.data = append(stream.data, data2...) - ok, complete = parser.parse(stream) + ok, complete = parser.parse(stream, 0) if !ok || !complete { b.Errorf("failed to parse message") } diff --git a/packetbeat/tests/system/config/packetbeat.yml.j2 b/packetbeat/tests/system/config/packetbeat.yml.j2 index 79db5b9bf83d..916f4ee672fd 100644 --- a/packetbeat/tests/system/config/packetbeat.yml.j2 +++ b/packetbeat/tests/system/config/packetbeat.yml.j2 @@ -75,6 +75,7 @@ packetbeat.protocols.http: {%- endfor -%} ] {%- endif %} +{%- if http_max_message_size %} max_message_size: {{ http_max_message_size }} {%- endif %} packetbeat.protocols.memcache: ports: [{{ memcache_ports|default([11211])|join(", ") }}] diff --git a/packetbeat/tests/system/pcaps/http_get_2k_file.pcap b/packetbeat/tests/system/pcaps/http_get_2k_file.pcap new file mode 100644 index 000000000000..7f0d8069778a Binary files /dev/null and b/packetbeat/tests/system/pcaps/http_get_2k_file.pcap differ diff --git a/packetbeat/tests/system/test_0063_http_body.py b/packetbeat/tests/system/test_0063_http_body.py index 2bcf84e54ab2..4beede566cf0 100644 --- a/packetbeat/tests/system/test_0063_http_body.py +++ b/packetbeat/tests/system/test_0063_http_body.py @@ -90,3 +90,25 @@ def test_wrong_content_type(self): assert "request" not in o assert "response" not in o + + def test_large_body(self): + """ + Checks that the transaction is still created if the + message is larger than the max_message_size. + """ + self.render_config_template( + http_include_body_for=["binary"], + http_ports=[8000], + http_max_message_size=1024 + ) + self.run_packetbeat(pcap="http_get_2k_file.pcap", + debug_selectors=["*"]) + objs = self.read_output() + + assert len(objs) == 1 + o = objs[0] + print len(o["http.response.body"]) + + # response body should be included but trimmed + assert len(o["http.response.body"]) < 2000 + assert len(o["http.response.body"]) > 500