Skip to content

Commit

Permalink
plumb selector to reservoir func
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Oct 15, 2024
1 parent 7acaf00 commit eefbce2
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
3 changes: 1 addition & 2 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvi

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and filter configuration.
func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] {
provider := DefaultExemplarReservoirProviderSelector(agg)
func reservoirFunc[N int64 | float64](provider exemplar.ReservoirProvider, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs))
}
Expand Down
5 changes: 4 additions & 1 deletion sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
// The view explicitly requested the default aggregation.
stream.Aggregation = DefaultAggregationSelector(kind)
}
if stream.ExemplarReservoirProviderSelector == nil {
stream.ExemplarReservoirProviderSelector = DefaultExemplarReservoirProviderSelector
}

if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
return nil, 0, fmt.Errorf(
Expand All @@ -352,7 +355,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
cv := i.aggregators.Lookup(normID, func() aggVal[N] {
b := aggregate.Builder[N]{
Temporality: i.pipeline.reader.temporality(kind),
ReservoirFunc: reservoirFunc[N](stream.Aggregation, i.pipeline.exemplarFilter),
ReservoirFunc: reservoirFunc[N](stream.ExemplarReservoirProviderSelector(stream.Aggregation), i.pipeline.exemplarFilter),
}
b.Filter = stream.AttributeFilter
// A value less than or equal to zero will disable the aggregation
Expand Down

0 comments on commit eefbce2

Please sign in to comment.