diff --git a/src/aggregator/aggregator/counter_elem_gen.go b/src/aggregator/aggregator/counter_elem_gen.go index 40ca12f331..f40e5922f0 100644 --- a/src/aggregator/aggregator/counter_elem_gen.go +++ b/src/aggregator/aggregator/counter_elem_gen.go @@ -206,7 +206,7 @@ func (e *CounterElem) AddUnique( versionsSeen.Set(version) if metric.Version > 0 { - e.metrics.updatedValues.Inc(1) + e.writeMetrics.updatedValues.Inc(1) for i := range metric.Values { if err := lockedAgg.aggregation.UpdateVal(timestamp, metric.Values[i], metric.PrevValues[i]); err != nil { return err @@ -630,6 +630,7 @@ func (e *CounterElem) findOrCreate( alignedStartNanos int64, createOpts createAggregationOptions, ) (*lockedCounterAggregation, error) { + e.writeMetrics.writes.Inc(1) alignedStart := xtime.UnixNano(alignedStartNanos) found, err := e.find(alignedStart) if err != nil { diff --git a/src/aggregator/aggregator/elem_base.go b/src/aggregator/aggregator/elem_base.go index 54e2ea6952..0a1f1b5750 100644 --- a/src/aggregator/aggregator/elem_base.go +++ b/src/aggregator/aggregator/elem_base.go @@ -61,6 +61,9 @@ const ( // compute first-order derivatives. This applies to the most common usecases // without imposing significant bookkeeping overhead. maxSupportedTransformationDerivativeOrder = 1 + listTypeLabel = "list-type" + resolutionLabel = "resolution" + flushTypeLabel = "flush-type" ) var ( @@ -187,6 +190,7 @@ type elemBase struct { cachedSourceSets []map[uint32]*bitset.BitSet // nolint: structcheck // a cache of the flush metrics that don't require grabbing a lock to access. flushMetricsCache map[flushKey]flushMetrics + writeMetrics writeMetrics tombstoned bool closed bool useDefaultAggregation bool // really immutable, but packed w/ the rest of bools @@ -240,10 +244,10 @@ func (f *flushState) close() { } type elemMetrics struct { - scope tally.Scope - updatedValues tally.Counter - flush map[flushKey]flushMetrics - mtx sync.RWMutex + scope tally.Scope + write map[metricListType]writeMetrics + flush map[flushKey]flushMetrics + mtx sync.RWMutex } // flushMetrics are the metrics produced by a flush task processing the metric element. @@ -258,6 +262,11 @@ type flushMetrics struct { forwardLags map[forwardKey]tally.Histogram } +type writeMetrics struct { + writes tally.Counter + updatedValues tally.Counter +} + func newFlushMetrics(scope tally.Scope) flushMetrics { forwardLagBuckets := tally.DurationBuckets{ 10 * time.Millisecond, @@ -294,6 +303,13 @@ func newFlushMetrics(scope tally.Scope) flushMetrics { return m } +func newWriteMetrics(scope tally.Scope) writeMetrics { + return writeMetrics{ + updatedValues: scope.Counter("updated-values"), + writes: scope.Counter("writes"), + } +} + func (f flushMetrics) forwardLag(key forwardKey) tally.Histogram { return f.forwardLags[key] } @@ -328,9 +344,9 @@ func (f forwardKey) toTags() map[string]string { func (f flushKey) toTags() map[string]string { return map[string]string{ - "resolution": f.resolution.String(), - "list-type": f.listType.String(), - "flush-type": f.flushType.String(), + resolutionLabel: f.resolution.String(), + listTypeLabel: f.listType.String(), + flushTypeLabel: f.flushType.String(), } } @@ -354,6 +370,26 @@ func (e *elemMetrics) flushMetrics(key flushKey) flushMetrics { return m } +func (e *elemMetrics) writeMetrics(key metricListType) writeMetrics { + e.mtx.RLock() + m, ok := e.write[key] + if ok { + e.mtx.RUnlock() + return m + } + e.mtx.RUnlock() + e.mtx.Lock() + m, ok = e.write[key] + if ok { + e.mtx.Unlock() + return m + } + m = newWriteMetrics(e.scope.Tagged(map[string]string{listTypeLabel: key.String()})) + e.write[key] = m + e.mtx.Unlock() + return m +} + // ElemOptions are the parameters for constructing a new elemBase. type ElemOptions struct { aggregatorOpts Options @@ -368,9 +404,9 @@ func NewElemOptions(aggregatorOpts Options) ElemOptions { aggregatorOpts: aggregatorOpts, aggregationOpts: raggregation.NewOptions(aggregatorOpts.InstrumentOptions()), elemMetrics: &elemMetrics{ - updatedValues: scope.Counter("updated-values"), - scope: scope, - flush: make(map[flushKey]flushMetrics), + scope: scope, + write: make(map[metricListType]writeMetrics), + flush: make(map[flushKey]flushMetrics), }, } } @@ -420,6 +456,7 @@ func (e *elemBase) resetSetData(data ElemData, useDefaultAggregation bool) error e.closed = false e.idPrefixSuffixType = data.IDPrefixSuffixType e.listType = data.ListType + e.writeMetrics = e.metrics.writeMetrics(e.listType) return nil } diff --git a/src/aggregator/aggregator/elem_base_test.go b/src/aggregator/aggregator/elem_base_test.go index 7f8ee0b2fc..63bd786dbc 100644 --- a/src/aggregator/aggregator/elem_base_test.go +++ b/src/aggregator/aggregator/elem_base_test.go @@ -33,6 +33,7 @@ import ( "github.com/m3db/m3/src/metrics/pipeline" "github.com/m3db/m3/src/metrics/pipeline/applied" "github.com/m3db/m3/src/metrics/transformation" + "github.com/m3db/m3/src/x/clock" "github.com/stretchr/testify/require" ) @@ -44,7 +45,7 @@ func mustNewOp(t require.TestingT, ttype transformation.Type) transformation.Op } func TestElemBaseID(t *testing.T) { - e := &elemBase{} + e := newTestElemBase() require.NoError(t, e.resetSetData(testCounterElemData, false)) require.Equal(t, testCounterID, e.ID()) } @@ -86,7 +87,7 @@ func TestElemBaseResetSetData(t *testing.T) { }, }), } - e := &elemBase{} + e := newTestElemBase() elemData := testCounterElemData elemData.AggTypes = testAggregationTypesExpensive elemData.NumForwardedTimes = 3 @@ -112,7 +113,7 @@ func TestElemBaseResetSetDataNoRollup(t *testing.T) { Transformation: pipeline.TransformationOp{Type: transformation.Absolute}, }, }) - e := &elemBase{} + e := newTestElemBase() elemData := testCounterElemData elemData.Pipeline = pipelineNoRollup err := e.resetSetData(elemData, false) @@ -126,7 +127,7 @@ func TestElemBaseForwardedIDWithDefaultPipeline(t *testing.T) { } func TestElemBaseForwardedIDWithCustomPipeline(t *testing.T) { - e := &elemBase{} + e := newTestElemBase() require.NoError(t, e.resetSetData(testCounterElemData, false)) fid, ok := e.ForwardedID() require.True(t, ok) @@ -139,8 +140,12 @@ func TestElemBaseForwardedAggregationKeyWithDefaultPipeline(t *testing.T) { require.False(t, ok) } +func newTestElemBase() elemBase { + return newElemBase(NewElemOptions(NewOptions(clock.NewOptions()))) +} + func TestElemBaseForwardedAggregationKeyWithCustomPipeline(t *testing.T) { - e := &elemBase{} + e := newTestElemBase() elemData := testCounterElemData elemData.NumForwardedTimes = 3 require.NoError(t, e.resetSetData(elemData, false)) @@ -164,7 +169,7 @@ func TestElemBaseForwardedAggregationKeyWithCustomPipeline(t *testing.T) { } func TestElemBaseMarkAsTombStoned(t *testing.T) { - e := &elemBase{} + e := newTestElemBase() require.False(t, e.tombstoned) // Marking a closed element tombstoned has no impact. diff --git a/src/aggregator/aggregator/gauge_elem_gen.go b/src/aggregator/aggregator/gauge_elem_gen.go index 967ab7abbf..65e60d1138 100644 --- a/src/aggregator/aggregator/gauge_elem_gen.go +++ b/src/aggregator/aggregator/gauge_elem_gen.go @@ -206,7 +206,7 @@ func (e *GaugeElem) AddUnique( versionsSeen.Set(version) if metric.Version > 0 { - e.metrics.updatedValues.Inc(1) + e.writeMetrics.updatedValues.Inc(1) for i := range metric.Values { if err := lockedAgg.aggregation.UpdateVal(timestamp, metric.Values[i], metric.PrevValues[i]); err != nil { return err @@ -630,6 +630,7 @@ func (e *GaugeElem) findOrCreate( alignedStartNanos int64, createOpts createAggregationOptions, ) (*lockedGaugeAggregation, error) { + e.writeMetrics.writes.Inc(1) alignedStart := xtime.UnixNano(alignedStartNanos) found, err := e.find(alignedStart) if err != nil { diff --git a/src/aggregator/aggregator/generic_elem.go b/src/aggregator/aggregator/generic_elem.go index 00664b0621..74e52178b1 100644 --- a/src/aggregator/aggregator/generic_elem.go +++ b/src/aggregator/aggregator/generic_elem.go @@ -269,7 +269,7 @@ func (e *GenericElem) AddUnique( versionsSeen.Set(version) if metric.Version > 0 { - e.metrics.updatedValues.Inc(1) + e.writeMetrics.updatedValues.Inc(1) for i := range metric.Values { if err := lockedAgg.aggregation.UpdateVal(timestamp, metric.Values[i], metric.PrevValues[i]); err != nil { return err @@ -693,6 +693,7 @@ func (e *GenericElem) findOrCreate( alignedStartNanos int64, createOpts createAggregationOptions, ) (*lockedAggregation, error) { + e.writeMetrics.writes.Inc(1) alignedStart := xtime.UnixNano(alignedStartNanos) found, err := e.find(alignedStart) if err != nil { diff --git a/src/aggregator/aggregator/timer_elem_gen.go b/src/aggregator/aggregator/timer_elem_gen.go index 1523ae21a9..b824d11120 100644 --- a/src/aggregator/aggregator/timer_elem_gen.go +++ b/src/aggregator/aggregator/timer_elem_gen.go @@ -206,7 +206,7 @@ func (e *TimerElem) AddUnique( versionsSeen.Set(version) if metric.Version > 0 { - e.metrics.updatedValues.Inc(1) + e.writeMetrics.updatedValues.Inc(1) for i := range metric.Values { if err := lockedAgg.aggregation.UpdateVal(timestamp, metric.Values[i], metric.PrevValues[i]); err != nil { return err @@ -630,6 +630,7 @@ func (e *TimerElem) findOrCreate( alignedStartNanos int64, createOpts createAggregationOptions, ) (*lockedTimerAggregation, error) { + e.writeMetrics.writes.Inc(1) alignedStart := xtime.UnixNano(alignedStartNanos) found, err := e.find(alignedStart) if err != nil {