Skip to content

Commit

Permalink
[coordinator] Use table based approach for aggregation tile buffer pa…
Browse files Browse the repository at this point in the history
…st calculation (#1717)
  • Loading branch information
robskillington authored Jun 9, 2019
1 parent fb40fe6 commit c6fefee
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 31 deletions.
127 changes: 121 additions & 6 deletions integrations/grafana/m3coordinator_dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
"steppedLine": false,
"targets": [
{
"expr": "rate(coordinator_fetch_success{source=\"remote\"}[1m])",
"expr": "rate(coordinator_fetch_success{}[1m])",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "queries_per_second",
Expand Down Expand Up @@ -175,7 +175,7 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(coordinator_fetch_errors{source=\"remote\"}[1m])) by (code)",
"expr": "sum(rate(coordinator_fetch_errors{}[1m])) by (code)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{code}}",
Expand Down Expand Up @@ -274,7 +274,7 @@
"steppedLine": false,
"targets": [
{
"expr": "rate(coordinator_write_success{source=\"remote\"}[1m])",
"expr": "sum(irate(coordinator_m3db_client_write_success{}[1m]))",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "write_per_second",
Expand All @@ -285,7 +285,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Writes Per Second",
"title": "Remote Writes Per Second",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down Expand Up @@ -360,7 +360,14 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(coordinator_write_errors{source=\"remote\"}[1m])) by (code)",
"expr": "sum(rate(coordinator_write_success{}[1m]))",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "2XX",
"refId": "B"
},
{
"expr": "sum(rate(coordinator_write_errors{}[1m])) by (code)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{code}}",
Expand All @@ -371,7 +378,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Write Errors Per Second",
"title": "Remote Write Batch Requests Per Second",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down Expand Up @@ -407,6 +414,114 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "$datasource",
"description": "This is the measurement of the difference between the server's relative time now() and the datapoint's timestamp after it is persisted (or fails) to be written.",
"fill": 1,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 18
},
"id": 17,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"paceLength": 10,
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "histogram_quantile(0.99, sum(rate(coordinator_ingest_latency_bucket[1m])) by (le))",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "p99 latency",
"refId": "A"
},
{
"expr": "histogram_quantile(0.95, sum(rate(coordinator_ingest_latency_bucket[1m])) by (le))",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "p95 latency",
"refId": "B"
},
{
"expr": "histogram_quantile(0.75, sum(rate(coordinator_ingest_latency_bucket[1m])) by (le))",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "p75 latency",
"refId": "C"
},
{
"expr": "histogram_quantile(0.5, sum(rate(coordinator_ingest_latency_bucket[1m])) by (le))",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "p50 latency",
"refId": "D"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Write Ingest Latency (Relative to datapoint timestamp)",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "s",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"refresh": false,
Expand Down
56 changes: 31 additions & 25 deletions src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,40 +48,24 @@ import (
"github.com/m3db/m3/src/metrics/rules"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/x/serialize"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
xsync "github.com/m3db/m3/src/x/sync"
)

const (
instanceID = "downsampler_local"
placementKVKey = "/placement"
replicationFactor = 1
defaultStorageFlushConcurrency = 20000
defaultOpenTimeout = 10 * time.Second
minBufferPast = 5 * time.Second
maxBufferPast = 10 * time.Minute
defaultBufferPastTimedMetricFactor = 0.1
defaultBufferFutureTimedMetric = time.Minute
instanceID = "downsampler_local"
placementKVKey = "/placement"
replicationFactor = 1
defaultStorageFlushConcurrency = 20000
defaultOpenTimeout = 10 * time.Second
defaultBufferFutureTimedMetric = time.Minute
)

var (
numShards = runtime.NumCPU()
defaultBufferForPastTimedMetricFn = func(r time.Duration) time.Duration {
value := time.Duration(defaultBufferPastTimedMetricFactor * float64(r))

// Clamp minBufferPast <= value <= maxBufferPast.
if value < minBufferPast {
return minBufferPast
}
if value > maxBufferPast {
return maxBufferPast
}

return value
}
numShards = runtime.NumCPU()

errNoStorage = errors.New("dynamic downsampling enabled with storage not set")
errNoClusterClient = errors.New("dynamic downsampling enabled with cluster client not set")
Expand Down Expand Up @@ -346,7 +330,7 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
SetElectionManager(electionManager).
SetFlushManager(flushManager).
SetFlushHandler(flushHandler).
SetBufferForPastTimedMetricFn(defaultBufferForPastTimedMetricFn).
SetBufferForPastTimedMetricFn(bufferForPastTimedMetric).
SetBufferForFutureTimedMetric(defaultBufferFutureTimedMetric)

if cfg.AggregationTypes != nil {
Expand Down Expand Up @@ -617,3 +601,25 @@ func (o DownsamplerOptions) newAggregatorFlushManagerAndHandler(

return flushManager, handler
}

var (
bufferPastLimits = []struct {
upperBound time.Duration
bufferPast time.Duration
}{
{upperBound: 0, bufferPast: 15 * time.Second},
{upperBound: 30 * time.Second, bufferPast: 30 * time.Second},
{upperBound: time.Minute, bufferPast: time.Minute},
}
)

func bufferForPastTimedMetric(tile time.Duration) time.Duration {
bufferPast := bufferPastLimits[0].bufferPast
for _, limit := range bufferPastLimits {
if tile < limit.upperBound {
return bufferPast
}
bufferPast = limit.bufferPast
}
return bufferPast
}
52 changes: 52 additions & 0 deletions src/cmd/services/m3coordinator/downsample/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package downsample

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestBufferForPastTimedMetric(t *testing.T) {
tests := []struct {
value time.Duration
expected time.Duration
}{
{value: -1 * time.Second, expected: 15 * time.Second},
{value: 0, expected: 15 * time.Second},
{value: 1 * time.Second, expected: 15 * time.Second},
{value: 29 * time.Second, expected: 15 * time.Second},
{value: 30 * time.Second, expected: 30 * time.Second},
{value: 59 * time.Second, expected: 30 * time.Second},
{value: 60 * time.Second, expected: time.Minute},
{value: 59 * time.Minute, expected: time.Minute},
{value: 61 * time.Minute, expected: time.Minute},
}
for _, test := range tests {
t.Run(fmt.Sprintf("test_value_%v", test.value), func(t *testing.T) {
result := bufferForPastTimedMetric(test.value)
require.Equal(t, test.expected, result)
})
}
}

0 comments on commit c6fefee

Please sign in to comment.