forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Cherry-pick elastic#12741 to 7.2: [Packetbeat] Redis: Limit memory us…
…ed by replication (elastic#12752) This patch limits the amount of memory that can be used by outstanding requests by adding two new configuration options to the Redis protocol: - max_queue_size: Limits the total size of the queued requests (in bytes). - max_queue_length: Limits the number of requests queued. These limits apply to individual connections. The defaults are to queue up to 1MB or 20.000 requests. This is enough to limit the currently unbounded memory used by replication streams while at the same time allow for pipelining requests. Closes elastic#12657 (cherry picked from commit 4d0ea18)
- Loading branch information
Showing
9 changed files
with
342 additions
and
62 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package redis | ||
|
||
import ( | ||
"math" | ||
) | ||
|
||
// Message interface needs to be implemented by types in order to be stored | ||
// in a MessageQueue. | ||
type Message interface { | ||
// Size returns the size of the current element. | ||
Size() int | ||
} | ||
|
||
type listEntry struct { | ||
item Message | ||
next *listEntry | ||
} | ||
|
||
// MessageQueue defines a queue that automatically evicts messages based on | ||
// the total size or number of elements contained. | ||
type MessageQueue struct { | ||
head, tail *listEntry | ||
bytesAvail int64 | ||
slotsAvail int32 | ||
} | ||
|
||
// MessageQueueConfig represents the configuration for a MessageQueue. | ||
// Setting any limit to zero disables the limit. | ||
type MessageQueueConfig struct { | ||
// MaxBytes is the maximum number of bytes that can be stored in the queue. | ||
MaxBytes int64 `config:"queue_max_bytes"` | ||
|
||
// MaxMessages sets a limit on the number of messages that the queue can hold. | ||
MaxMessages int32 `config:"queue_max_messages"` | ||
} | ||
|
||
// NewMessageQueue creates a new MessageQueue with the given configuration. | ||
func NewMessageQueue(c MessageQueueConfig) (queue MessageQueue) { | ||
queue.bytesAvail = c.MaxBytes | ||
if queue.bytesAvail <= 0 { | ||
queue.bytesAvail = math.MaxInt64 | ||
} | ||
queue.slotsAvail = c.MaxMessages | ||
if queue.slotsAvail <= 0 { | ||
queue.slotsAvail = math.MaxInt32 | ||
} | ||
return queue | ||
} | ||
|
||
// Append appends a new message into the queue, returning the number of | ||
// messages evicted to make room, if any. | ||
func (ml *MessageQueue) Append(msg Message) (evicted int) { | ||
size := int64(msg.Size()) | ||
evicted = ml.adjust(size) | ||
ml.slotsAvail-- | ||
ml.bytesAvail -= size | ||
entry := &listEntry{ | ||
item: msg, | ||
} | ||
if ml.tail == nil { | ||
ml.head = entry | ||
} else { | ||
ml.tail.next = entry | ||
} | ||
ml.tail = entry | ||
return evicted | ||
} | ||
|
||
// IsEmpty returns if the MessageQueue is empty. | ||
func (ml *MessageQueue) IsEmpty() bool { | ||
return ml.head == nil | ||
} | ||
|
||
// Pop returns the oldest message in the queue, if any. | ||
func (ml *MessageQueue) Pop() Message { | ||
if ml.head == nil { | ||
return nil | ||
} | ||
|
||
msg := ml.head | ||
ml.head = msg.next | ||
if ml.head == nil { | ||
ml.tail = nil | ||
} | ||
ml.slotsAvail++ | ||
ml.bytesAvail += int64(msg.item.Size()) | ||
return msg.item | ||
} | ||
|
||
func (ml *MessageQueue) adjust(msgSize int64) (evicted int) { | ||
if ml.slotsAvail == 0 { | ||
ml.Pop() | ||
evicted++ | ||
} | ||
for ml.bytesAvail < msgSize && !ml.IsEmpty() { | ||
ml.Pop() | ||
evicted++ | ||
} | ||
return evicted | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package redis | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
type testMessage int | ||
|
||
func (t testMessage) Size() int { | ||
return int(t) | ||
} | ||
|
||
func TestMessageList_Append(t *testing.T) { | ||
for _, test := range []struct { | ||
title string | ||
maxBytes int64 | ||
maxCount int32 | ||
input []int | ||
expected []int | ||
}{ | ||
{ | ||
title: "unbounded queue", | ||
maxBytes: 0, | ||
maxCount: 0, | ||
input: []int{1, 2, 3, 4, 5}, | ||
expected: []int{1, 2, 3, 4, 5}, | ||
}, | ||
{ | ||
title: "count limited", | ||
maxBytes: 0, | ||
maxCount: 3, | ||
input: []int{1, 2, 3, 4, 5}, | ||
expected: []int{3, 4, 5}, | ||
}, | ||
{ | ||
title: "count limit boundary", | ||
maxBytes: 0, | ||
maxCount: 3, | ||
input: []int{1, 2, 3}, | ||
expected: []int{1, 2, 3}, | ||
}, | ||
{ | ||
title: "size limited", | ||
maxBytes: 10, | ||
maxCount: 0, | ||
input: []int{1, 2, 3, 4, 5}, | ||
expected: []int{4, 5}, | ||
}, | ||
{ | ||
title: "size limited boundary", | ||
maxBytes: 10, | ||
maxCount: 0, | ||
input: []int{1, 2, 3, 4}, | ||
expected: []int{1, 2, 3, 4}, | ||
}, | ||
{ | ||
title: "excess size", | ||
maxBytes: 10, | ||
maxCount: 0, | ||
input: []int{1, 2, 3, 100}, | ||
expected: []int{100}, | ||
}, | ||
{ | ||
title: "excess size 2", | ||
maxBytes: 10, | ||
maxCount: 0, | ||
input: []int{100, 1}, | ||
expected: []int{1}, | ||
}, | ||
{ | ||
title: "excess size 3", | ||
maxBytes: 10, | ||
maxCount: 0, | ||
input: []int{1, 2, 3, 4, 5, 5}, | ||
expected: []int{5, 5}, | ||
}, | ||
{ | ||
title: "both", | ||
maxBytes: 10, | ||
maxCount: 3, | ||
input: []int{3, 4, 2, 1}, | ||
expected: []int{4, 2, 1}, | ||
}, | ||
} { | ||
t.Run(test.title, func(t *testing.T) { | ||
conf := MessageQueueConfig{ | ||
MaxBytes: test.maxBytes, | ||
MaxMessages: test.maxCount, | ||
} | ||
q := NewMessageQueue(conf) | ||
for _, elem := range test.input { | ||
q.Append(testMessage(elem)) | ||
} | ||
var result []int | ||
for !q.IsEmpty() { | ||
msg := q.Pop() | ||
if !assert.NotNil(t, msg) { | ||
t.FailNow() | ||
} | ||
value, ok := msg.(testMessage) | ||
if !assert.True(t, ok) { | ||
t.FailNow() | ||
} | ||
result = append(result, value.Size()) | ||
} | ||
assert.Equal(t, test.expected, result) | ||
}) | ||
} | ||
} |
Oops, something went wrong.