From 6d29bc279f3b91cf8a0d45cffddc9e55bbae7fd7 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Mon, 1 Jul 2019 16:18:27 +0200 Subject: [PATCH] [Packetbeat] Redis: Limit memory used by replication [12657] This patch limits the amount of memory that can be used by outstanding requests by adding two new configuration options to the Redis protocol: - max_queue_size: Limits the total size of the queued requests (in bytes). - max_queue_length: Limits the number of requests queued. These limits apply to individual connections. The defaults are to queue up to 1MB or 20.000 requests. This is enough to limit the currently unbounded memory used by replication streams while at the same time allow for pipelining requests. Closes #12657 --- CHANGELOG.next.asciidoc | 1 + packetbeat/_meta/beat.reference.yml | 9 ++ packetbeat/docs/packetbeat-options.asciidoc | 33 +++++ packetbeat/packetbeat.reference.yml | 9 ++ packetbeat/protos/redis/config.go | 5 + packetbeat/protos/redis/messagequeue.go | 117 +++++++++++++++++ packetbeat/protos/redis/messagequeue_test.go | 128 +++++++++++++++++++ packetbeat/protos/redis/redis.go | 91 ++++++------- packetbeat/protos/redis/redis_parse.go | 10 +- 9 files changed, 341 insertions(+), 62 deletions(-) create mode 100644 packetbeat/protos/redis/messagequeue.go create mode 100644 packetbeat/protos/redis/messagequeue_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 28abef111b40..2e7583242003 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -144,6 +144,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Prevent duplicate packet loss error messages in HTTP events. {pull}10709[10709] - Fixed a memory leak when using process monitoring under Windows. {pull}12100[12100] - Improved debug logging efficiency in PGQSL module. {issue}12150[12150] +- Limit memory usage of Redis replication sessions. {issue}12657[12657] *Winlogbeat* diff --git a/packetbeat/_meta/beat.reference.yml b/packetbeat/_meta/beat.reference.yml index a77690bb8f96..b250e5454e35 100644 --- a/packetbeat/_meta/beat.reference.yml +++ b/packetbeat/_meta/beat.reference.yml @@ -333,6 +333,15 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Max size for per-session message queue. This places a limit on the memory + # that can be used to buffer requests and responses for correlation. + #queue_max_bytes: 1048576 + + # Max number of messages for per-session message queue. This limits the number + # of requests or responses that can be buffered for correlation. Set a value + # large enough to allow for pipelining. + #queue_max_messages: 20000 + - type: thrift # Enable thrift monitoring. Default: true #enabled: true diff --git a/packetbeat/docs/packetbeat-options.asciidoc b/packetbeat/docs/packetbeat-options.asciidoc index 41a7d57f5859..782edfe29a93 100644 --- a/packetbeat/docs/packetbeat-options.asciidoc +++ b/packetbeat/docs/packetbeat-options.asciidoc @@ -1336,6 +1336,39 @@ Valid values are `sha1`, `sha256` and `md5`. If `send_certificates` is false, this setting is ignored. The default is to output SHA-1 fingerprints. +[[packetbeat-redis-options]] +=== Capture Redis traffic + +++++ +Redis +++++ + +The Redis protocol has several specific configuration options. Here is a +sample configuration for the `redis` section of the +{beatname_lc}.yml+ config file: + +[source,yaml] +------------------------------------------------------------------------------ +packetbeat.protocols: +- type: redis + ports: [6379] + queue_max_bytes: 1048576 + queue_max_messages: 20000 +------------------------------------------------------------------------------ + +==== Configuration options + +Also see <>. + +===== `queue_max_bytes` and `queue_max_messages` + +In order for request/response correlation to work, {beatname_uc} needs to +store requests in memory until a response is received. These settings impose +a limit on the number of bytes (`queue_max_bytes`) and number of requests +(`queue_max_messages`) that can be stored. These limits are per-connection. +The default is to queue up to 1MB or 20.000 requests per connection, which +allows to use request pipelining while at the same time limiting the amount +of memory consumed by replication sessions. + [[configuration-processes]] == Specify which processes to monitor diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 89d4b85bd84d..e95ef3fd4351 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -333,6 +333,15 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Max size for per-session message queue. This places a limit on the memory + # that can be used to buffer requests and responses for correlation. + #queue_max_bytes: 1048576 + + # Max number of messages for per-session message queue. This limits the number + # of requests or responses that can be buffered for correlation. Set a value + # large enough to allow for pipelining. + #queue_max_messages: 20000 + - type: thrift # Enable thrift monitoring. Default: true #enabled: true diff --git a/packetbeat/protos/redis/config.go b/packetbeat/protos/redis/config.go index b71f59c31dda..0450923ca898 100644 --- a/packetbeat/protos/redis/config.go +++ b/packetbeat/protos/redis/config.go @@ -24,6 +24,7 @@ import ( type redisConfig struct { config.ProtocolCommon `config:",inline"` + QueueLimits MessageQueueConfig `config:",inline"` } var ( @@ -31,5 +32,9 @@ var ( ProtocolCommon: config.ProtocolCommon{ TransactionTimeout: protos.DefaultTransactionExpiration, }, + QueueLimits: MessageQueueConfig{ + MaxBytes: 1024 * 1024, + MaxMessages: 20000, + }, } ) diff --git a/packetbeat/protos/redis/messagequeue.go b/packetbeat/protos/redis/messagequeue.go new file mode 100644 index 000000000000..4c01db7b2b5c --- /dev/null +++ b/packetbeat/protos/redis/messagequeue.go @@ -0,0 +1,117 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "math" +) + +// Message interface needs to be implemented by types in order to be stored +// in a MessageQueue. +type Message interface { + // Size returns the size of the current element. + Size() int +} + +type listEntry struct { + item Message + next *listEntry +} + +// MessageQueue defines a queue that automatically evict messages based on +// the total size or number of elements contained. +type MessageQueue struct { + head, tail *listEntry + bytesAvail int64 + slotsAvail int32 +} + +// MessageQueueConfig represents the configuration for a MessageQueue. +// Setting any limit to zero disables the limit. +type MessageQueueConfig struct { + // MaxBytes is the maximum number of bytes that can be stored in the queue. + MaxBytes int64 `config:"queue_max_bytes"` + + // MaxMessages sets a limit on the number of messages that the queue can hold. + MaxMessages int32 `config:"queue_max_messages"` +} + +// NewMessageQueue creates a new MessageQueue with the given configuration. +func NewMessageQueue(c MessageQueueConfig) (queue MessageQueue) { + queue.bytesAvail = c.MaxBytes + if queue.bytesAvail <= 0 { + queue.bytesAvail = math.MaxInt64 + } + queue.slotsAvail = c.MaxMessages + if queue.slotsAvail <= 0 { + queue.slotsAvail = math.MaxInt32 + } + return queue +} + +// Append appends a new message into the queue, returning the number of +// messages evicted to make room, if any. +func (ml *MessageQueue) Append(msg Message) (evicted int) { + size := int64(msg.Size()) + evicted = ml.adjust(size) + ml.slotsAvail-- + ml.bytesAvail -= size + entry := &listEntry{ + item: msg, + } + if ml.tail == nil { + ml.head = entry + } else { + ml.tail.next = entry + } + ml.tail = entry + return evicted +} + +// IsEmpty returns if the MessageQueue is empty. +func (ml *MessageQueue) IsEmpty() bool { + return ml.head == nil +} + +// Pop returns the oldest message in the queue, if any. +func (ml *MessageQueue) Pop() Message { + if ml.head == nil { + return nil + } + + msg := ml.head + ml.head = msg.next + if ml.head == nil { + ml.tail = nil + } + ml.slotsAvail++ + ml.bytesAvail += int64(msg.item.Size()) + return msg.item +} + +func (ml *MessageQueue) adjust(msgSize int64) (evicted int) { + if ml.slotsAvail == 0 { + ml.Pop() + evicted++ + } + for ml.bytesAvail < msgSize && !ml.IsEmpty() { + ml.Pop() + evicted++ + } + return evicted +} diff --git a/packetbeat/protos/redis/messagequeue_test.go b/packetbeat/protos/redis/messagequeue_test.go new file mode 100644 index 000000000000..9909c7cfad5a --- /dev/null +++ b/packetbeat/protos/redis/messagequeue_test.go @@ -0,0 +1,128 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +type testMessage int + +func (t testMessage) Size() int { + return int(t) +} + +func TestMessageList_Append(t *testing.T) { + for _, test := range []struct { + title string + maxBytes int64 + maxCount int32 + input []int + expected []int + }{ + { + title: "unbounded queue", + maxBytes: 0, + maxCount: 0, + input: []int{1, 2, 3, 4, 5}, + expected: []int{1, 2, 3, 4, 5}, + }, + { + title: "count limited", + maxBytes: 0, + maxCount: 3, + input: []int{1, 2, 3, 4, 5}, + expected: []int{3, 4, 5}, + }, + { + title: "count limit boundary", + maxBytes: 0, + maxCount: 3, + input: []int{1, 2, 3}, + expected: []int{1, 2, 3}, + }, + { + title: "size limited", + maxBytes: 10, + maxCount: 0, + input: []int{1, 2, 3, 4, 5}, + expected: []int{4, 5}, + }, + { + title: "size limited boundary", + maxBytes: 10, + maxCount: 0, + input: []int{1, 2, 3, 4}, + expected: []int{1, 2, 3, 4}, + }, + { + title: "excess size", + maxBytes: 10, + maxCount: 0, + input: []int{1, 2, 3, 100}, + expected: []int{100}, + }, + { + title: "excess size 2", + maxBytes: 10, + maxCount: 0, + input: []int{100, 1}, + expected: []int{1}, + }, + { + title: "excess size 3", + maxBytes: 10, + maxCount: 0, + input: []int{1, 2, 3, 4, 5, 5}, + expected: []int{5, 5}, + }, + { + title: "both", + maxBytes: 10, + maxCount: 3, + input: []int{3, 4, 2, 1}, + expected: []int{4, 2, 1}, + }, + } { + t.Run(test.title, func(t *testing.T) { + conf := MessageQueueConfig{ + MaxBytes: test.maxBytes, + MaxMessages: test.maxCount, + } + q := NewMessageQueue(conf) + for _, elem := range test.input { + q.Append(testMessage(elem)) + } + var result []int + for !q.IsEmpty() { + msg := q.Pop() + if !assert.NotNil(t, msg) { + t.FailNow() + } + value, ok := msg.(testMessage) + if !assert.True(t, ok) { + t.FailNow() + } + result = append(result, value.Size()) + } + assert.Equal(t, test.expected, result) + }) + } +} diff --git a/packetbeat/protos/redis/redis.go b/packetbeat/protos/redis/redis.go index c043d3bc1c7d..3962c199a8e8 100644 --- a/packetbeat/protos/redis/redis.go +++ b/packetbeat/protos/redis/redis.go @@ -41,22 +41,18 @@ type stream struct { type redisConnectionData struct { streams [2]*stream - requests messageList - responses messageList -} - -type messageList struct { - head, tail *redisMessage + requests MessageQueue + responses MessageQueue } // Redis protocol plugin type redisPlugin struct { // config - ports []int - sendRequest bool - sendResponse bool - + ports []int + sendRequest bool + sendResponse bool transactionTimeout time.Duration + queueConfig MessageQueueConfig results protos.Reporter } @@ -68,6 +64,7 @@ var ( var ( unmatchedResponses = monitoring.NewInt(nil, "redis.unmatched_responses") + unmatchedRequests = monitoring.NewInt(nil, "redis.unmatched_requests") ) func init() { @@ -107,6 +104,7 @@ func (redis *redisPlugin) setFromConfig(config *redisConfig) { redis.sendRequest = config.SendRequest redis.sendResponse = config.SendResponse redis.transactionTimeout = config.TransactionTimeout + redis.queueConfig = config.QueueLimits } func (redis *redisPlugin) GetPorts() []int { @@ -131,27 +129,33 @@ func (redis *redisPlugin) Parse( ) protos.ProtocolData { defer logp.Recover("ParseRedis exception") - conn := ensureRedisConnection(private) + conn := redis.ensureRedisConnection(private) conn = redis.doParse(conn, pkt, tcptuple, dir) if conn == nil { return nil } return conn } +func (redis *redisPlugin) newConnectionData() *redisConnectionData { + return &redisConnectionData{ + requests: NewMessageQueue(redis.queueConfig), + responses: NewMessageQueue(redis.queueConfig), + } +} -func ensureRedisConnection(private protos.ProtocolData) *redisConnectionData { +func (redis *redisPlugin) ensureRedisConnection(private protos.ProtocolData) *redisConnectionData { if private == nil { - return &redisConnectionData{} + return redis.newConnectionData() } priv, ok := private.(*redisConnectionData) if !ok { logp.Warn("redis connection data type error, create new one") - return &redisConnectionData{} + return redis.newConnectionData() } if priv == nil { logp.Warn("Unexpected: redis connection data not set, create new one") - return &redisConnectionData{} + return redis.newConnectionData() } return priv @@ -245,29 +249,37 @@ func (redis *redisPlugin) handleRedis( m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isRequest { - conn.requests.append(m) // wait for response + // wait for response + if evicted := conn.requests.Append(m); evicted > 0 { + unmatchedRequests.Add(int64(evicted)) + } } else { - conn.responses.append(m) + if evicted := conn.responses.Append(m); evicted > 0 { + unmatchedResponses.Add(int64(evicted)) + } redis.correlate(conn) } } func (redis *redisPlugin) correlate(conn *redisConnectionData) { // drop responses with missing requests - if conn.requests.empty() { - for !conn.responses.empty() { + if conn.requests.IsEmpty() { + for !conn.responses.IsEmpty() { debugf("Response from unknown transaction. Ignoring") unmatchedResponses.Add(1) - conn.responses.pop() + conn.responses.Pop() } return } // merge requests with responses into transactions - for !conn.responses.empty() && !conn.requests.empty() { - requ := conn.requests.pop() - resp := conn.responses.pop() - + for !conn.responses.IsEmpty() && !conn.requests.IsEmpty() { + requ, okReq := conn.requests.Pop().(*redisMessage) + resp, okResp := conn.responses.Pop().(*redisMessage) + if !okReq || !okResp { + logp.Err("invalid type found in message queue") + continue + } if redis.results != nil { event := redis.newTransaction(requ, resp) redis.results(event) @@ -333,34 +345,3 @@ func (redis *redisPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8, return private } - -func (ml *messageList) append(msg *redisMessage) { - if ml.tail == nil { - ml.head = msg - } else { - ml.tail.next = msg - } - msg.next = nil - ml.tail = msg -} - -func (ml *messageList) empty() bool { - return ml.head == nil -} - -func (ml *messageList) pop() *redisMessage { - if ml.head == nil { - return nil - } - - msg := ml.head - ml.head = ml.head.next - if ml.head == nil { - ml.tail = nil - } - return msg -} - -func (ml *messageList) last() *redisMessage { - return ml.tail -} diff --git a/packetbeat/protos/redis/redis_parse.go b/packetbeat/protos/redis/redis_parse.go index 27fbba6c181d..5ec9c3a7c286 100644 --- a/packetbeat/protos/redis/redis_parse.go +++ b/packetbeat/protos/redis/redis_parse.go @@ -44,15 +44,11 @@ type redisMessage struct { message common.NetString method common.NetString path common.NetString - - next *redisMessage } -const ( - start = iota - bulkArray - simpleMessage -) +func (msg *redisMessage) Size() int { + return len(msg.message) +} var ( empty = common.NetString("")