Skip to content

Commit

Permalink
[coordinator] Add Carbon ingest latency metrics (#3045)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Dec 24, 2020
1 parent d6c526d commit de4894f
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 65 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ linters-settings:
lll:
# max line length, lines longer will be reported. Default is 120.
# '\t' is counted as 1 character by default, and can be changed with the tab-width option
line-length: 100
line-length: 120
# tab width in spaces. Default to 1.
tab-width: 1
unused:
Expand Down
59 changes: 42 additions & 17 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,19 @@ func NewIngester(
}
})

scope := opts.InstrumentOptions.MetricsScope()
metrics, err := newCarbonIngesterMetrics(scope)
if err != nil {
return nil, err
}

ingester := &ingester{
downsamplerAndWriter: downsamplerAndWriter,
opts: opts,
logger: opts.InstrumentOptions.Logger(),
tagOpts: tagOpts,
metrics: newCarbonIngesterMetrics(
opts.InstrumentOptions.MetricsScope()),
lineResourcesPool: resourcePool,
metrics: metrics,
lineResourcesPool: resourcePool,
}
// No need to retain watch as NamespaceWatcher.Close() will handle closing any watches
// generated by creating listeners.
Expand Down Expand Up @@ -280,6 +285,7 @@ func (i *ingester) Handle(conn net.Conn) {

logger.Debug("handling new carbon ingestion connection")
for s.Scan() {
received := time.Now()
name, timestamp, value := s.Metric()

resources := i.getLineResources()
Expand All @@ -292,6 +298,19 @@ func (i *ingester) Handle(conn net.Conn) {
if ok {
i.metrics.success.Inc(1)
}

now := time.Now()

// Always record age regardless of success/failure since
// sometimes errors can be due to how old the metrics are
// and not recording age would obscure this visibility from
// the metrics of how fresh/old the incoming metrics are.
age := now.Sub(timestamp)
i.metrics.ingestLatency.RecordDuration(age)

// Also record write latency (not relative to metric timestamp).
i.metrics.writeLatency.RecordDuration(now.Sub(received))

// The contract is that after the DownsamplerAndWriter returns, any resources
// that it needed to hold onto have already been copied.
i.putLineResources(resources)
Expand Down Expand Up @@ -401,10 +420,8 @@ func (i *ingester) writeWithOptions(
return err
}

err = i.downsamplerAndWriter.Write(
ctx, tags, resources.datapoints, xtime.Second, nil, opts,
)

err = i.downsamplerAndWriter.Write(ctx, tags, resources.datapoints,
xtime.Second, nil, opts)
if err != nil {
i.logger.Error("err writing carbon metric",
zap.String("name", string(resources.name)), zap.Error(err))
Expand All @@ -419,18 +436,26 @@ func (i *ingester) Close() {
// We don't maintain any state in-between connections so there is nothing to do here.
}

func newCarbonIngesterMetrics(m tally.Scope) carbonIngesterMetrics {
return carbonIngesterMetrics{
success: m.Counter("success"),
err: m.Counter("error"),
malformed: m.Counter("malformed"),
}
type carbonIngesterMetrics struct {
success tally.Counter
err tally.Counter
malformed tally.Counter
ingestLatency tally.Histogram
writeLatency tally.Histogram
}

type carbonIngesterMetrics struct {
success tally.Counter
err tally.Counter
malformed tally.Counter
func newCarbonIngesterMetrics(scope tally.Scope) (carbonIngesterMetrics, error) {
buckets, err := ingest.NewLatencyBuckets()
if err != nil {
return carbonIngesterMetrics{}, err
}
return carbonIngesterMetrics{
success: scope.Counter("success"),
err: scope.Counter("error"),
malformed: scope.Counter("malformed"),
writeLatency: scope.SubScope("write").Histogram("latency", buckets.WriteLatencyBuckets),
ingestLatency: scope.SubScope("ingest").Histogram("latency", buckets.IngestLatencyBuckets),
}, nil
}

// GenerateTagsFromName accepts a carbon metric name and blows it up into a list of
Expand Down
90 changes: 90 additions & 0 deletions src/cmd/services/m3coordinator/ingest/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ingest

import (
"time"

"github.com/uber-go/tally"
)

// LatencyBuckets are a set of latency buckets useful for measuring things.
type LatencyBuckets struct {
WriteLatencyBuckets tally.DurationBuckets
IngestLatencyBuckets tally.DurationBuckets
}

// NewLatencyBuckets returns write and ingest latency buckets useful for
// measuring ingest latency (i.e. time from datapoint/sample created to time
// ingested) and write latency (i.e. time from received a sample from remote
// source to completion of that write locally).
func NewLatencyBuckets() (LatencyBuckets, error) {
upTo1sBuckets, err := tally.LinearDurationBuckets(0, 100*time.Millisecond, 10)
if err != nil {
return LatencyBuckets{}, err
}

upTo10sBuckets, err := tally.LinearDurationBuckets(time.Second, 500*time.Millisecond, 18)
if err != nil {
return LatencyBuckets{}, err
}

upTo60sBuckets, err := tally.LinearDurationBuckets(10*time.Second, 5*time.Second, 11)
if err != nil {
return LatencyBuckets{}, err
}

upTo60mBuckets, err := tally.LinearDurationBuckets(0, 5*time.Minute, 12)
if err != nil {
return LatencyBuckets{}, err
}
upTo60mBuckets = upTo60mBuckets[1:] // Remove the first 0s to get 5 min aligned buckets

upTo6hBuckets, err := tally.LinearDurationBuckets(time.Hour, 30*time.Minute, 12)
if err != nil {
return LatencyBuckets{}, err
}

upTo24hBuckets, err := tally.LinearDurationBuckets(6*time.Hour, time.Hour, 19)
if err != nil {
return LatencyBuckets{}, err
}
upTo24hBuckets = upTo24hBuckets[1:] // Remove the first 6h to get 1 hour aligned buckets

var writeLatencyBuckets tally.DurationBuckets
writeLatencyBuckets = append(writeLatencyBuckets, upTo1sBuckets...)
writeLatencyBuckets = append(writeLatencyBuckets, upTo10sBuckets...)
writeLatencyBuckets = append(writeLatencyBuckets, upTo60sBuckets...)
writeLatencyBuckets = append(writeLatencyBuckets, upTo60mBuckets...)

var ingestLatencyBuckets tally.DurationBuckets
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo1sBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo10sBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo60sBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo60mBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo6hBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo24hBuckets...)

return LatencyBuckets{
WriteLatencyBuckets: writeLatencyBuckets,
IngestLatencyBuckets: ingestLatencyBuckets,
}, nil
}
51 changes: 51 additions & 0 deletions src/cmd/services/m3coordinator/ingest/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ingest

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestLatencyBuckets(t *testing.T) {
buckets, err := NewLatencyBuckets()
require.NoError(t, err)

// NB(r): Bucket length is tested just to sanity check how many buckets we are creating
require.Equal(t, 50, len(buckets.WriteLatencyBuckets.AsDurations()))

// NB(r): Bucket values are tested to sanity check they look right
// nolint: lll
expected := "[0s 100ms 200ms 300ms 400ms 500ms 600ms 700ms 800ms 900ms 1s 1.5s 2s 2.5s 3s 3.5s 4s 4.5s 5s 5.5s 6s 6.5s 7s 7.5s 8s 8.5s 9s 9.5s 10s 15s 20s 25s 30s 35s 40s 45s 50s 55s 1m0s 5m0s 10m0s 15m0s 20m0s 25m0s 30m0s 35m0s 40m0s 45m0s 50m0s 55m0s]"
actual := fmt.Sprintf("%v", buckets.WriteLatencyBuckets.AsDurations())
require.Equal(t, expected, actual)

// NB(r): Bucket length is tested just to sanity check how many buckets we are creating
require.Equal(t, 80, len(buckets.IngestLatencyBuckets.AsDurations()))

// NB(r): Bucket values are tested to sanity check they look right
// nolint: lll
expected = "[0s 100ms 200ms 300ms 400ms 500ms 600ms 700ms 800ms 900ms 1s 1.5s 2s 2.5s 3s 3.5s 4s 4.5s 5s 5.5s 6s 6.5s 7s 7.5s 8s 8.5s 9s 9.5s 10s 15s 20s 25s 30s 35s 40s 45s 50s 55s 1m0s 5m0s 10m0s 15m0s 20m0s 25m0s 30m0s 35m0s 40m0s 45m0s 50m0s 55m0s 1h0m0s 1h30m0s 2h0m0s 2h30m0s 3h0m0s 3h30m0s 4h0m0s 4h30m0s 5h0m0s 5h30m0s 6h0m0s 6h30m0s 7h0m0s 8h0m0s 9h0m0s 10h0m0s 11h0m0s 12h0m0s 13h0m0s 14h0m0s 15h0m0s 16h0m0s 17h0m0s 18h0m0s 19h0m0s 20h0m0s 21h0m0s 22h0m0s 23h0m0s 24h0m0s]"
actual = fmt.Sprintf("%v", buckets.IngestLatencyBuckets.AsDurations())
require.Equal(t, expected, actual)
}
53 changes: 6 additions & 47 deletions src/query/api/v1/handler/prometheus/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,63 +215,22 @@ func (m *promWriteMetrics) incError(err error) {
}

func newPromWriteMetrics(scope tally.Scope) (promWriteMetrics, error) {
upTo1sBuckets, err := tally.LinearDurationBuckets(0, 100*time.Millisecond, 10)
buckets, err := ingest.NewLatencyBuckets()
if err != nil {
return promWriteMetrics{}, err
}

upTo10sBuckets, err := tally.LinearDurationBuckets(time.Second, 500*time.Millisecond, 18)
if err != nil {
return promWriteMetrics{}, err
}

upTo60sBuckets, err := tally.LinearDurationBuckets(10*time.Second, 5*time.Second, 11)
if err != nil {
return promWriteMetrics{}, err
}

upTo60mBuckets, err := tally.LinearDurationBuckets(0, 5*time.Minute, 12)
if err != nil {
return promWriteMetrics{}, err
}
upTo60mBuckets = upTo60mBuckets[1:] // Remove the first 0s to get 5 min aligned buckets

upTo6hBuckets, err := tally.LinearDurationBuckets(time.Hour, 30*time.Minute, 12)
if err != nil {
return promWriteMetrics{}, err
}

upTo24hBuckets, err := tally.LinearDurationBuckets(6*time.Hour, time.Hour, 19)
if err != nil {
return promWriteMetrics{}, err
}
upTo24hBuckets = upTo24hBuckets[1:] // Remove the first 6h to get 1 hour aligned buckets

var writeLatencyBuckets tally.DurationBuckets
writeLatencyBuckets = append(writeLatencyBuckets, upTo1sBuckets...)
writeLatencyBuckets = append(writeLatencyBuckets, upTo10sBuckets...)
writeLatencyBuckets = append(writeLatencyBuckets, upTo60sBuckets...)
writeLatencyBuckets = append(writeLatencyBuckets, upTo60mBuckets...)

var ingestLatencyBuckets tally.DurationBuckets
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo1sBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo10sBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo60sBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo60mBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo6hBuckets...)
ingestLatencyBuckets = append(ingestLatencyBuckets, upTo24hBuckets...)
return promWriteMetrics{
writeSuccess: scope.SubScope("write").Counter("success"),
writeErrorsServer: scope.SubScope("write").Tagged(map[string]string{"code": "5XX"}).Counter("errors"),
writeErrorsClient: scope.SubScope("write").Tagged(map[string]string{"code": "4XX"}).Counter("errors"),
writeBatchLatency: scope.SubScope("write").Histogram("batch-latency", writeLatencyBuckets),
writeBatchLatencyBuckets: writeLatencyBuckets,
ingestLatency: scope.SubScope("ingest").Histogram("latency", ingestLatencyBuckets),
ingestLatencyBuckets: ingestLatencyBuckets,
writeBatchLatency: scope.SubScope("write").Histogram("batch-latency", buckets.WriteLatencyBuckets),
writeBatchLatencyBuckets: buckets.WriteLatencyBuckets,
ingestLatency: scope.SubScope("ingest").Histogram("latency", buckets.IngestLatencyBuckets),
ingestLatencyBuckets: buckets.IngestLatencyBuckets,
forwardSuccess: scope.SubScope("forward").Counter("success"),
forwardErrors: scope.SubScope("forward").Counter("errors"),
forwardDropped: scope.SubScope("forward").Counter("dropped"),
forwardLatency: scope.SubScope("forward").Histogram("latency", writeLatencyBuckets),
forwardLatency: scope.SubScope("forward").Histogram("latency", buckets.WriteLatencyBuckets),
}, nil
}

Expand Down

0 comments on commit de4894f

Please sign in to comment.