Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Pool locked aggregations #4109

Merged
merged 5 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/aggregator/aggregation/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,6 @@ func (t *Timer) Annotation() []byte {
}

// Close closes the timer.
func (t *Timer) Close() { t.stream.Close() }
func (t *Timer) Close() {
t.stream.Close()
}
15 changes: 10 additions & 5 deletions src/aggregator/aggregator/counter_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 73 additions & 0 deletions src/aggregator/aggregator/elem_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,3 +739,76 @@ func newParsedPipeline(pipeline applied.Pipeline) (parsedPipeline, error) {
Rollup: rollup,
}, nil
}

// Placeholder to make compiler happy about generic elem base.
// NB: lockedAggregationFromPool and not newLockedAggregation to avoid yet another rename hack in makefile
func lockedAggregationFromPool(
aggregation typeSpecificAggregation,
sourcesSeen map[uint32]*bitset.BitSet,
) *lockedAggregation {
return &lockedAggregation{
aggregation: aggregation,
sourcesSeen: sourcesSeen,
}
}

func (l *lockedAggregation) close() {
l.aggregation.Close()
}

var lockedCounterAggregationPool = sync.Pool{New: func() interface{} { return &lockedCounterAggregation{} }}

func lockedCounterAggregationFromPool(
aggregation counterAggregation,
sourcesSeen map[uint32]*bitset.BitSet,
) *lockedCounterAggregation {
l := lockedCounterAggregationPool.Get().(*lockedCounterAggregation)
l.aggregation = aggregation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could factor the zeroing into a reset method, and reuse here + in close? Not a huge deal though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I simplified and made close() foolproof

l.sourcesSeen = sourcesSeen

return l
}

func (l *lockedCounterAggregation) close() {
l.aggregation.Close()
*l = lockedCounterAggregation{}
lockedCounterAggregationPool.Put(l)
}

var lockedGaugeAggregationPool = sync.Pool{New: func() interface{} { return &lockedGaugeAggregation{} }}

func lockedGaugeAggregationFromPool(
aggregation gaugeAggregation,
sourcesSeen map[uint32]*bitset.BitSet,
) *lockedGaugeAggregation {
l := lockedGaugeAggregationPool.Get().(*lockedGaugeAggregation)
l.aggregation = aggregation
l.sourcesSeen = sourcesSeen

return l
}

func (l *lockedGaugeAggregation) close() {
l.aggregation.Close()
*l = lockedGaugeAggregation{}
lockedGaugeAggregationPool.Put(l)
}

var lockedTimerAggregationPool = sync.Pool{New: func() interface{} { return &lockedTimerAggregation{} }}

func lockedTimerAggregationFromPool(
aggregation timerAggregation,
sourcesSeen map[uint32]*bitset.BitSet,
) *lockedTimerAggregation {
l := lockedTimerAggregationPool.Get().(*lockedTimerAggregation)
l.aggregation = aggregation
l.sourcesSeen = sourcesSeen

return l
}

func (l *lockedTimerAggregation) close() {
l.aggregation.Close()
*l = lockedTimerAggregation{}
lockedTimerAggregationPool.Put(l)
}
15 changes: 10 additions & 5 deletions src/aggregator/aggregator/gauge_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 10 additions & 5 deletions src/aggregator/aggregator/generic_elem.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type timedAggregation struct {

// close is called when the aggregation has been expired or the element is being closed.
func (ta *timedAggregation) close() {
ta.lockedAgg.aggregation.Close()
ta.lockedAgg.close()
ta.lockedAgg = nil
}

Expand Down Expand Up @@ -744,12 +744,17 @@ func (e *GenericElem) findOrCreate(
sourcesSeen = make(map[uint32]*bitset.BitSet)
}
}
// NB(vytenis): lockedAggregation will be returned to pool on timedAggregation close.
// this is a bit different from regular pattern of using a pool object due to codegen with Genny limitations,
// so we can avoid writing more boilerplate.
// timedAggregation itself is always pass-by-value, but lockedAggregation incurs an expensive allocation on heap
// in the critical path (30%+, depending on workload as of 2020-05-01): see https://github.com/m3db/m3/pull/4109
timedAgg = timedAggregation{
startAt: alignedStart,
lockedAgg: &lockedAggregation{
sourcesSeen: sourcesSeen,
aggregation: e.NewAggregation(e.opts, e.aggOpts),
},
lockedAgg: lockedAggregationFromPool(
vdarulis marked this conversation as resolved.
Show resolved Hide resolved
e.NewAggregation(e.opts, e.aggOpts),
sourcesSeen,
),
inDirtySet: true,
}

Expand Down
15 changes: 10 additions & 5 deletions src/aggregator/aggregator/timer_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.