Skip to content

Commit

Permalink
Merge branch 'master' into rhall-dup-rollup-take2
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanhall07 authored Dec 1, 2021
2 parents aeb82ea + 29a3cc9 commit 479954f
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 20 deletions.
3 changes: 2 additions & 1 deletion src/aggregator/aggregator/counter_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 47 additions & 10 deletions src/aggregator/aggregator/elem_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ const (
// compute first-order derivatives. This applies to the most common usecases
// without imposing significant bookkeeping overhead.
maxSupportedTransformationDerivativeOrder = 1
listTypeLabel = "list-type"
resolutionLabel = "resolution"
flushTypeLabel = "flush-type"
)

var (
Expand Down Expand Up @@ -187,6 +190,7 @@ type elemBase struct {
cachedSourceSets []map[uint32]*bitset.BitSet // nolint: structcheck
// a cache of the flush metrics that don't require grabbing a lock to access.
flushMetricsCache map[flushKey]flushMetrics
writeMetrics writeMetrics
tombstoned bool
closed bool
useDefaultAggregation bool // really immutable, but packed w/ the rest of bools
Expand Down Expand Up @@ -240,10 +244,10 @@ func (f *flushState) close() {
}

type elemMetrics struct {
scope tally.Scope
updatedValues tally.Counter
flush map[flushKey]flushMetrics
mtx sync.RWMutex
scope tally.Scope
write map[metricListType]writeMetrics
flush map[flushKey]flushMetrics
mtx sync.RWMutex
}

// flushMetrics are the metrics produced by a flush task processing the metric element.
Expand All @@ -258,6 +262,11 @@ type flushMetrics struct {
forwardLags map[forwardKey]tally.Histogram
}

type writeMetrics struct {
writes tally.Counter
updatedValues tally.Counter
}

func newFlushMetrics(scope tally.Scope) flushMetrics {
forwardLagBuckets := tally.DurationBuckets{
10 * time.Millisecond,
Expand Down Expand Up @@ -294,6 +303,13 @@ func newFlushMetrics(scope tally.Scope) flushMetrics {
return m
}

func newWriteMetrics(scope tally.Scope) writeMetrics {
return writeMetrics{
updatedValues: scope.Counter("updated-values"),
writes: scope.Counter("writes"),
}
}

func (f flushMetrics) forwardLag(key forwardKey) tally.Histogram {
return f.forwardLags[key]
}
Expand Down Expand Up @@ -328,9 +344,9 @@ func (f forwardKey) toTags() map[string]string {

func (f flushKey) toTags() map[string]string {
return map[string]string{
"resolution": f.resolution.String(),
"list-type": f.listType.String(),
"flush-type": f.flushType.String(),
resolutionLabel: f.resolution.String(),
listTypeLabel: f.listType.String(),
flushTypeLabel: f.flushType.String(),
}
}

Expand All @@ -354,6 +370,26 @@ func (e *elemMetrics) flushMetrics(key flushKey) flushMetrics {
return m
}

func (e *elemMetrics) writeMetrics(key metricListType) writeMetrics {
e.mtx.RLock()
m, ok := e.write[key]
if ok {
e.mtx.RUnlock()
return m
}
e.mtx.RUnlock()
e.mtx.Lock()
m, ok = e.write[key]
if ok {
e.mtx.Unlock()
return m
}
m = newWriteMetrics(e.scope.Tagged(map[string]string{listTypeLabel: key.String()}))
e.write[key] = m
e.mtx.Unlock()
return m
}

// ElemOptions are the parameters for constructing a new elemBase.
type ElemOptions struct {
aggregatorOpts Options
Expand All @@ -368,9 +404,9 @@ func NewElemOptions(aggregatorOpts Options) ElemOptions {
aggregatorOpts: aggregatorOpts,
aggregationOpts: raggregation.NewOptions(aggregatorOpts.InstrumentOptions()),
elemMetrics: &elemMetrics{
updatedValues: scope.Counter("updated-values"),
scope: scope,
flush: make(map[flushKey]flushMetrics),
scope: scope,
write: make(map[metricListType]writeMetrics),
flush: make(map[flushKey]flushMetrics),
},
}
}
Expand Down Expand Up @@ -420,6 +456,7 @@ func (e *elemBase) resetSetData(data ElemData, useDefaultAggregation bool) error
e.closed = false
e.idPrefixSuffixType = data.IDPrefixSuffixType
e.listType = data.ListType
e.writeMetrics = e.metrics.writeMetrics(e.listType)
return nil
}

Expand Down
17 changes: 11 additions & 6 deletions src/aggregator/aggregator/elem_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/m3db/m3/src/metrics/pipeline"
"github.com/m3db/m3/src/metrics/pipeline/applied"
"github.com/m3db/m3/src/metrics/transformation"
"github.com/m3db/m3/src/x/clock"

"github.com/stretchr/testify/require"
)
Expand All @@ -44,7 +45,7 @@ func mustNewOp(t require.TestingT, ttype transformation.Type) transformation.Op
}

func TestElemBaseID(t *testing.T) {
e := &elemBase{}
e := newTestElemBase()
require.NoError(t, e.resetSetData(testCounterElemData, false))
require.Equal(t, testCounterID, e.ID())
}
Expand Down Expand Up @@ -86,7 +87,7 @@ func TestElemBaseResetSetData(t *testing.T) {
},
}),
}
e := &elemBase{}
e := newTestElemBase()
elemData := testCounterElemData
elemData.AggTypes = testAggregationTypesExpensive
elemData.NumForwardedTimes = 3
Expand All @@ -112,7 +113,7 @@ func TestElemBaseResetSetDataNoRollup(t *testing.T) {
Transformation: pipeline.TransformationOp{Type: transformation.Absolute},
},
})
e := &elemBase{}
e := newTestElemBase()
elemData := testCounterElemData
elemData.Pipeline = pipelineNoRollup
err := e.resetSetData(elemData, false)
Expand All @@ -126,7 +127,7 @@ func TestElemBaseForwardedIDWithDefaultPipeline(t *testing.T) {
}

func TestElemBaseForwardedIDWithCustomPipeline(t *testing.T) {
e := &elemBase{}
e := newTestElemBase()
require.NoError(t, e.resetSetData(testCounterElemData, false))
fid, ok := e.ForwardedID()
require.True(t, ok)
Expand All @@ -139,8 +140,12 @@ func TestElemBaseForwardedAggregationKeyWithDefaultPipeline(t *testing.T) {
require.False(t, ok)
}

func newTestElemBase() elemBase {
return newElemBase(NewElemOptions(NewOptions(clock.NewOptions())))
}

func TestElemBaseForwardedAggregationKeyWithCustomPipeline(t *testing.T) {
e := &elemBase{}
e := newTestElemBase()
elemData := testCounterElemData
elemData.NumForwardedTimes = 3
require.NoError(t, e.resetSetData(elemData, false))
Expand All @@ -164,7 +169,7 @@ func TestElemBaseForwardedAggregationKeyWithCustomPipeline(t *testing.T) {
}

func TestElemBaseMarkAsTombStoned(t *testing.T) {
e := &elemBase{}
e := newTestElemBase()
require.False(t, e.tombstoned)

// Marking a closed element tombstoned has no impact.
Expand Down
3 changes: 2 additions & 1 deletion src/aggregator/aggregator/gauge_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/aggregator/aggregator/generic_elem.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (e *GenericElem) AddUnique(
versionsSeen.Set(version)

if metric.Version > 0 {
e.metrics.updatedValues.Inc(1)
e.writeMetrics.updatedValues.Inc(1)
for i := range metric.Values {
if err := lockedAgg.aggregation.UpdateVal(timestamp, metric.Values[i], metric.PrevValues[i]); err != nil {
return err
Expand Down Expand Up @@ -693,6 +693,7 @@ func (e *GenericElem) findOrCreate(
alignedStartNanos int64,
createOpts createAggregationOptions,
) (*lockedAggregation, error) {
e.writeMetrics.writes.Inc(1)
alignedStart := xtime.UnixNano(alignedStartNanos)
found, err := e.find(alignedStart)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion src/aggregator/aggregator/timer_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 479954f

Please sign in to comment.