diff --git a/src/aggregator/aggregation/timer.go b/src/aggregator/aggregation/timer.go index 34ccd5b8b5..edbb5860e2 100644 --- a/src/aggregator/aggregation/timer.go +++ b/src/aggregator/aggregation/timer.go @@ -158,4 +158,6 @@ func (t *Timer) Annotation() []byte { } // Close closes the timer. -func (t *Timer) Close() { t.stream.Close() } +func (t *Timer) Close() { + t.stream.Close() +} diff --git a/src/aggregator/aggregator/counter_elem_gen.go b/src/aggregator/aggregator/counter_elem_gen.go index 7cdcf087e6..815cde7561 100644 --- a/src/aggregator/aggregator/counter_elem_gen.go +++ b/src/aggregator/aggregator/counter_elem_gen.go @@ -63,7 +63,7 @@ type timedCounter struct { // close is called when the aggregation has been expired or the element is being closed. func (ta *timedCounter) close() { - ta.lockedAgg.aggregation.Close() + ta.lockedAgg.close() ta.lockedAgg = nil } @@ -681,12 +681,17 @@ func (e *CounterElem) findOrCreate( sourcesSeen = make(map[uint32]*bitset.BitSet) } } + // NB(vytenis): lockedCounterAggregation will be returned to pool on timedCounter close. + // this is a bit different from regular pattern of using a pool object due to codegen with Genny limitations, + // so we can avoid writing more boilerplate. + // timedCounter itself is always pass-by-value, but lockedCounterAggregation incurs an expensive allocation on heap + // in the critical path (30%+, depending on workload as of 2020-05-01): see https://github.com/m3db/m3/pull/4109 timedAgg = timedCounter{ startAt: alignedStart, - lockedAgg: &lockedCounterAggregation{ - sourcesSeen: sourcesSeen, - aggregation: e.NewAggregation(e.opts, e.aggOpts), - }, + lockedAgg: lockedCounterAggregationFromPool( + e.NewAggregation(e.opts, e.aggOpts), + sourcesSeen, + ), inDirtySet: true, } diff --git a/src/aggregator/aggregator/elem_base.go b/src/aggregator/aggregator/elem_base.go index 1dd943320f..37bd272af9 100644 --- a/src/aggregator/aggregator/elem_base.go +++ b/src/aggregator/aggregator/elem_base.go @@ -739,3 +739,76 @@ func newParsedPipeline(pipeline applied.Pipeline) (parsedPipeline, error) { Rollup: rollup, }, nil } + +// Placeholder to make compiler happy about generic elem base. +// NB: lockedAggregationFromPool and not newLockedAggregation to avoid yet another rename hack in makefile +func lockedAggregationFromPool( + aggregation typeSpecificAggregation, + sourcesSeen map[uint32]*bitset.BitSet, +) *lockedAggregation { + return &lockedAggregation{ + aggregation: aggregation, + sourcesSeen: sourcesSeen, + } +} + +func (l *lockedAggregation) close() { + l.aggregation.Close() +} + +var lockedCounterAggregationPool = sync.Pool{New: func() interface{} { return &lockedCounterAggregation{} }} + +func lockedCounterAggregationFromPool( + aggregation counterAggregation, + sourcesSeen map[uint32]*bitset.BitSet, +) *lockedCounterAggregation { + l := lockedCounterAggregationPool.Get().(*lockedCounterAggregation) + l.aggregation = aggregation + l.sourcesSeen = sourcesSeen + + return l +} + +func (l *lockedCounterAggregation) close() { + l.aggregation.Close() + *l = lockedCounterAggregation{} + lockedCounterAggregationPool.Put(l) +} + +var lockedGaugeAggregationPool = sync.Pool{New: func() interface{} { return &lockedGaugeAggregation{} }} + +func lockedGaugeAggregationFromPool( + aggregation gaugeAggregation, + sourcesSeen map[uint32]*bitset.BitSet, +) *lockedGaugeAggregation { + l := lockedGaugeAggregationPool.Get().(*lockedGaugeAggregation) + l.aggregation = aggregation + l.sourcesSeen = sourcesSeen + + return l +} + +func (l *lockedGaugeAggregation) close() { + l.aggregation.Close() + *l = lockedGaugeAggregation{} + lockedGaugeAggregationPool.Put(l) +} + +var lockedTimerAggregationPool = sync.Pool{New: func() interface{} { return &lockedTimerAggregation{} }} + +func lockedTimerAggregationFromPool( + aggregation timerAggregation, + sourcesSeen map[uint32]*bitset.BitSet, +) *lockedTimerAggregation { + l := lockedTimerAggregationPool.Get().(*lockedTimerAggregation) + l.aggregation = aggregation + l.sourcesSeen = sourcesSeen + + return l +} + +func (l *lockedTimerAggregation) close() { + l.aggregation.Close() + *l = lockedTimerAggregation{} + lockedTimerAggregationPool.Put(l) +} diff --git a/src/aggregator/aggregator/gauge_elem_gen.go b/src/aggregator/aggregator/gauge_elem_gen.go index d16c246c88..8c3b27b09c 100644 --- a/src/aggregator/aggregator/gauge_elem_gen.go +++ b/src/aggregator/aggregator/gauge_elem_gen.go @@ -63,7 +63,7 @@ type timedGauge struct { // close is called when the aggregation has been expired or the element is being closed. func (ta *timedGauge) close() { - ta.lockedAgg.aggregation.Close() + ta.lockedAgg.close() ta.lockedAgg = nil } @@ -681,12 +681,17 @@ func (e *GaugeElem) findOrCreate( sourcesSeen = make(map[uint32]*bitset.BitSet) } } + // NB(vytenis): lockedGaugeAggregation will be returned to pool on timedGauge close. + // this is a bit different from regular pattern of using a pool object due to codegen with Genny limitations, + // so we can avoid writing more boilerplate. + // timedGauge itself is always pass-by-value, but lockedGaugeAggregation incurs an expensive allocation on heap + // in the critical path (30%+, depending on workload as of 2020-05-01): see https://github.com/m3db/m3/pull/4109 timedAgg = timedGauge{ startAt: alignedStart, - lockedAgg: &lockedGaugeAggregation{ - sourcesSeen: sourcesSeen, - aggregation: e.NewAggregation(e.opts, e.aggOpts), - }, + lockedAgg: lockedGaugeAggregationFromPool( + e.NewAggregation(e.opts, e.aggOpts), + sourcesSeen, + ), inDirtySet: true, } diff --git a/src/aggregator/aggregator/generic_elem.go b/src/aggregator/aggregator/generic_elem.go index e14ec1aa8b..9101dbfbb2 100644 --- a/src/aggregator/aggregator/generic_elem.go +++ b/src/aggregator/aggregator/generic_elem.go @@ -126,7 +126,7 @@ type timedAggregation struct { // close is called when the aggregation has been expired or the element is being closed. func (ta *timedAggregation) close() { - ta.lockedAgg.aggregation.Close() + ta.lockedAgg.close() ta.lockedAgg = nil } @@ -744,12 +744,17 @@ func (e *GenericElem) findOrCreate( sourcesSeen = make(map[uint32]*bitset.BitSet) } } + // NB(vytenis): lockedAggregation will be returned to pool on timedAggregation close. + // this is a bit different from regular pattern of using a pool object due to codegen with Genny limitations, + // so we can avoid writing more boilerplate. + // timedAggregation itself is always pass-by-value, but lockedAggregation incurs an expensive allocation on heap + // in the critical path (30%+, depending on workload as of 2020-05-01): see https://github.com/m3db/m3/pull/4109 timedAgg = timedAggregation{ startAt: alignedStart, - lockedAgg: &lockedAggregation{ - sourcesSeen: sourcesSeen, - aggregation: e.NewAggregation(e.opts, e.aggOpts), - }, + lockedAgg: lockedAggregationFromPool( + e.NewAggregation(e.opts, e.aggOpts), + sourcesSeen, + ), inDirtySet: true, } diff --git a/src/aggregator/aggregator/timer_elem_gen.go b/src/aggregator/aggregator/timer_elem_gen.go index 80786cc640..768e621e04 100644 --- a/src/aggregator/aggregator/timer_elem_gen.go +++ b/src/aggregator/aggregator/timer_elem_gen.go @@ -63,7 +63,7 @@ type timedTimer struct { // close is called when the aggregation has been expired or the element is being closed. func (ta *timedTimer) close() { - ta.lockedAgg.aggregation.Close() + ta.lockedAgg.close() ta.lockedAgg = nil } @@ -681,12 +681,17 @@ func (e *TimerElem) findOrCreate( sourcesSeen = make(map[uint32]*bitset.BitSet) } } + // NB(vytenis): lockedTimerAggregation will be returned to pool on timedTimer close. + // this is a bit different from regular pattern of using a pool object due to codegen with Genny limitations, + // so we can avoid writing more boilerplate. + // timedTimer itself is always pass-by-value, but lockedTimerAggregation incurs an expensive allocation on heap + // in the critical path (30%+, depending on workload as of 2020-05-01): see https://github.com/m3db/m3/pull/4109 timedAgg = timedTimer{ startAt: alignedStart, - lockedAgg: &lockedTimerAggregation{ - sourcesSeen: sourcesSeen, - aggregation: e.NewAggregation(e.opts, e.aggOpts), - }, + lockedAgg: lockedTimerAggregationFromPool( + e.NewAggregation(e.opts, e.aggOpts), + sourcesSeen, + ), inDirtySet: true, }