Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move aggregators to internal/aggregate #4283

Merged
merged 3 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand Down Expand Up @@ -171,7 +171,7 @@ type streamID struct {
}

type int64Inst struct {
aggregators []internal.Aggregator[int64]
aggregators []aggregate.Aggregator[int64]

embedded.Int64Counter
embedded.Int64UpDownCounter
Expand All @@ -192,7 +192,7 @@ func (i *int64Inst) Record(ctx context.Context, val int64, opts ...metric.Record
i.aggregate(ctx, val, c.Attributes())
}

func (i *int64Inst) aggregate(ctx context.Context, val int64, s attribute.Set) {
func (i *int64Inst) aggregate(ctx context.Context, val int64, s attribute.Set) { // nolint:revive // okay to shadow pkg with method.
if err := ctx.Err(); err != nil {
return
}
Expand All @@ -202,7 +202,7 @@ func (i *int64Inst) aggregate(ctx context.Context, val int64, s attribute.Set) {
}

type float64Inst struct {
aggregators []internal.Aggregator[float64]
aggregators []aggregate.Aggregator[float64]

embedded.Float64Counter
embedded.Float64UpDownCounter
Expand Down Expand Up @@ -254,7 +254,7 @@ var _ metric.Float64ObservableCounter = float64Observable{}
var _ metric.Float64ObservableUpDownCounter = float64Observable{}
var _ metric.Float64ObservableGauge = float64Observable{}

func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []internal.Aggregator[float64]) float64Observable {
func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[float64]) float64Observable {
return float64Observable{
observable: newObservable(scope, kind, name, desc, u, agg),
}
Expand All @@ -273,7 +273,7 @@ var _ metric.Int64ObservableCounter = int64Observable{}
var _ metric.Int64ObservableUpDownCounter = int64Observable{}
var _ metric.Int64ObservableGauge = int64Observable{}

func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []internal.Aggregator[int64]) int64Observable {
func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[int64]) int64Observable {
return int64Observable{
observable: newObservable(scope, kind, name, desc, u, agg),
}
Expand All @@ -283,10 +283,10 @@ type observable[N int64 | float64] struct {
metric.Observable
observablID[N]

aggregators []internal.Aggregator[N]
aggregators []aggregate.Aggregator[N]
}

func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []internal.Aggregator[N]) *observable[N] {
func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[N]) *observable[N] {
return &observable[N]{
observablID: observablID[N]{
name: name,
Expand Down
18 changes: 9 additions & 9 deletions sdk/metric/instrument_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"testing"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

func BenchmarkInstrument(b *testing.B) {
Expand All @@ -32,10 +32,10 @@ func BenchmarkInstrument(b *testing.B) {
}

b.Run("instrumentImpl/aggregate", func(b *testing.B) {
inst := int64Inst{aggregators: []internal.Aggregator[int64]{
internal.NewLastValue[int64](),
internal.NewCumulativeSum[int64](true),
internal.NewDeltaSum[int64](true),
inst := int64Inst{aggregators: []aggregate.Aggregator[int64]{
aggregate.NewLastValue[int64](),
aggregate.NewCumulativeSum[int64](true),
aggregate.NewDeltaSum[int64](true),
}}
ctx := context.Background()

Expand All @@ -47,10 +47,10 @@ func BenchmarkInstrument(b *testing.B) {
})

b.Run("observable/observe", func(b *testing.B) {
o := observable[int64]{aggregators: []internal.Aggregator[int64]{
internal.NewLastValue[int64](),
internal.NewCumulativeSum[int64](true),
internal.NewDeltaSum[int64](true),
o := observable[int64]{aggregators: []aggregate.Aggregator[int64]{
aggregate.NewLastValue[int64](),
aggregate.NewCumulativeSum[int64](true),
aggregate.NewDeltaSum[int64](true),
}}

b.ReportAllocs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"time"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal
package aggregate

import (
"context"
Expand Down Expand Up @@ -105,7 +105,7 @@ func Example() {
_, _ = m.Int64Histogram("histogram example")

// Output:
// using *internal.cumulativeSum[int64] aggregator for counter
// using *internal.lastValue[int64] aggregator for up-down counter
// using *internal.deltaHistogram[int64] aggregator for histogram
// using *aggregate.cumulativeSum[int64] aggregator for counter
// using *aggregate.lastValue[int64] aggregator for up-down counter
// using *aggregate.deltaHistogram[int64] aggregator for histogram
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"strconv"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package internal provides types and functionality used to aggregate and
// Package aggregate provides aggregate types used compute aggregations and
// cycle the state of metric measurements made by the SDK. These types and
// functionality are meant only for internal SDK use.
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"go.opentelemetry.io/otel/attribute"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"sort"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"sort"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"sync"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"sync"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"testing"
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

var (
Expand Down Expand Up @@ -441,7 +441,7 @@ func newInt64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string,
return &int64InstProvider{scope: s, pipes: p, resolve: newResolver[int64](p, c)}
}

func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]internal.Aggregator[int64], error) {
func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Aggregator[int64], error) {
inst := Instrument{
Name: name,
Description: desc,
Expand Down Expand Up @@ -469,7 +469,7 @@ func newFloat64InstProvider(s instrumentation.Scope, p pipelines, c *cache[strin
return &float64InstProvider{scope: s, pipes: p, resolve: newResolver[float64](p, c)}
}

func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]internal.Aggregator[float64], error) {
func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Aggregator[float64], error) {
inst := Instrument{
Name: name,
Description: desc,
Expand Down
33 changes: 17 additions & 16 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
)
Expand Down Expand Up @@ -233,16 +234,16 @@ func newInserter[N int64 | float64](p *pipeline, vc *cache[string, streamID]) *i
//
// If an instrument is determined to use a Drop aggregation, that instrument is
// not inserted nor returned.
func (i *inserter[N]) Instrument(inst Instrument) ([]internal.Aggregator[N], error) {
func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Aggregator[N], error) {
var (
matched bool
aggs []internal.Aggregator[N]
aggs []aggregate.Aggregator[N]
)

errs := &multierror{wrapped: errCreatingAggregators}
// The cache will return the same Aggregator instance. Use this fact to
// compare pointer addresses to deduplicate Aggregators.
seen := make(map[internal.Aggregator[N]]struct{})
seen := make(map[aggregate.Aggregator[N]]struct{})
for _, v := range i.pipeline.views {
stream, match := v(inst)
if !match {
Expand Down Expand Up @@ -288,7 +289,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]internal.Aggregator[N], err

// aggVal is the cached value in an aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Aggregator aggregate.Aggregator[N]
Err error
}

Expand All @@ -305,7 +306,7 @@ type aggVal[N int64 | float64] struct {
//
// If the instrument defines an unknown or incompatible aggregation, an error
// is returned.
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (internal.Aggregator[N], error) {
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (aggregate.Aggregator[N], error) {
switch stream.Aggregation.(type) {
case nil, aggregation.Default:
// Undefined, nil, means to use the default from the reader.
Expand All @@ -332,7 +333,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
return aggVal[N]{nil, nil}
}
if stream.AttributeFilter != nil {
agg = internal.NewFilter(agg, stream.AttributeFilter)
agg = aggregate.NewFilter(agg, stream.AttributeFilter)
}

i.pipeline.addSync(scope, instrumentSync{
Expand Down Expand Up @@ -388,14 +389,14 @@ func (i *inserter[N]) streamID(kind InstrumentKind, stream Stream) streamID {
// aggregator returns a new Aggregator matching agg, kind, temporality, and
// monotonic. If the agg is unknown or temporality is invalid, an error is
// returned.
func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind InstrumentKind, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) {
func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind InstrumentKind, temporality metricdata.Temporality, monotonic bool) (aggregate.Aggregator[N], error) {
switch a := agg.(type) {
case aggregation.Default:
return i.aggregator(DefaultAggregationSelector(kind), kind, temporality, monotonic)
case aggregation.Drop:
return nil, nil
case aggregation.LastValue:
return internal.NewLastValue[N](), nil
return aggregate.NewLastValue[N](), nil
case aggregation.Sum:
switch kind {
case InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter:
Expand All @@ -404,28 +405,28 @@ func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind InstrumentKin
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/metrics/api.md#asynchronous-counter-creation
switch temporality {
case metricdata.CumulativeTemporality:
return internal.NewPrecomputedCumulativeSum[N](monotonic), nil
return aggregate.NewPrecomputedCumulativeSum[N](monotonic), nil
case metricdata.DeltaTemporality:
return internal.NewPrecomputedDeltaSum[N](monotonic), nil
return aggregate.NewPrecomputedDeltaSum[N](monotonic), nil
default:
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality)
}
}

switch temporality {
case metricdata.CumulativeTemporality:
return internal.NewCumulativeSum[N](monotonic), nil
return aggregate.NewCumulativeSum[N](monotonic), nil
case metricdata.DeltaTemporality:
return internal.NewDeltaSum[N](monotonic), nil
return aggregate.NewDeltaSum[N](monotonic), nil
default:
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality)
}
case aggregation.ExplicitBucketHistogram:
switch temporality {
case metricdata.CumulativeTemporality:
return internal.NewCumulativeHistogram[N](a), nil
return aggregate.NewCumulativeHistogram[N](a), nil
case metricdata.DeltaTemporality:
return internal.NewDeltaHistogram[N](a), nil
return aggregate.NewDeltaHistogram[N](a), nil
default:
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality)
}
Expand Down Expand Up @@ -536,8 +537,8 @@ func newResolver[N int64 | float64](p pipelines, vc *cache[string, streamID]) re

// Aggregators returns the Aggregators that must be updated by the instrument
// defined by key.
func (r resolver[N]) Aggregators(id Instrument) ([]internal.Aggregator[N], error) {
var aggs []internal.Aggregator[N]
func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Aggregator[N], error) {
var aggs []aggregate.Aggregator[N]

errs := &multierror{}
for _, i := range r.inserters {
Expand Down
Loading