Skip to content

Commit

Permalink
Add metrics for bytes added/removed to the m3msg buffer (#2656)
Browse files Browse the repository at this point in the history
Helpful for understanding how to size the buffer to survive a m3aggregator
deploy. Due to the graceful shutdown of a m3agregator, it can take almost a
minute between connections closing on the old instances and connections
start taking traffic on the new instances. If a coordinator takes 10MB/s
of traffic, it only takes 10s to fill up a 100MB buffer and start dropping requests.
  • Loading branch information
ryanhall07 authored Sep 25, 2020
1 parent 06d23e2 commit 13290be
Showing 1 changed file with 6 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/msg/producer/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type bufferMetrics struct {
messageBuffered tally.Gauge
byteBuffered tally.Gauge
bufferScanBatch tally.Timer
bytesAdded tally.Counter
bytesRemoved tally.Counter
}

type counterPerNumRefBuckets struct {
Expand Down Expand Up @@ -117,6 +119,8 @@ func newBufferMetrics(
messageBuffered: scope.Gauge("message-buffered"),
byteBuffered: scope.Gauge("byte-buffered"),
bufferScanBatch: instrument.NewTimer(scope, "buffer-scan-batch", opts),
bytesAdded: scope.Counter("buffer-bytes-added"),
bytesRemoved: scope.Counter("buffer-bytes-removed"),
}
}

Expand Down Expand Up @@ -174,6 +178,7 @@ func NewBuffer(opts Options) (producer.Buffer, error) {

func (b *buffer) Add(m producer.Message) (*producer.RefCountedMessage, error) {
s := m.Size()
b.m.bytesAdded.Inc(int64(s))
if s > b.maxMessageSize {
b.m.messageTooLarge.Inc(1)
return nil, errMessageTooLarge
Expand Down Expand Up @@ -444,5 +449,6 @@ func (b *buffer) bufferLen() int {
}

func (b *buffer) subSize(rm *producer.RefCountedMessage) {
b.m.bytesRemoved.Inc(int64(rm.Size()))
b.size.Sub(rm.Size())
}

0 comments on commit 13290be

Please sign in to comment.