Skip to content

Commit

Permalink
Move aggs to internal/aggregate (#4283)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias authored Jul 3, 2023
1 parent 97273da commit 10c3445
Show file tree
Hide file tree
Showing 17 changed files with 75 additions and 74 deletions.
16 changes: 8 additions & 8 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand Down Expand Up @@ -171,7 +171,7 @@ type streamID struct {
}

type int64Inst struct {
aggregators []internal.Aggregator[int64]
aggregators []aggregate.Aggregator[int64]

embedded.Int64Counter
embedded.Int64UpDownCounter
Expand All @@ -192,7 +192,7 @@ func (i *int64Inst) Record(ctx context.Context, val int64, opts ...metric.Record
i.aggregate(ctx, val, c.Attributes())
}

func (i *int64Inst) aggregate(ctx context.Context, val int64, s attribute.Set) {
func (i *int64Inst) aggregate(ctx context.Context, val int64, s attribute.Set) { // nolint:revive // okay to shadow pkg with method.
if err := ctx.Err(); err != nil {
return
}
Expand All @@ -202,7 +202,7 @@ func (i *int64Inst) aggregate(ctx context.Context, val int64, s attribute.Set) {
}

type float64Inst struct {
aggregators []internal.Aggregator[float64]
aggregators []aggregate.Aggregator[float64]

embedded.Float64Counter
embedded.Float64UpDownCounter
Expand Down Expand Up @@ -254,7 +254,7 @@ var _ metric.Float64ObservableCounter = float64Observable{}
var _ metric.Float64ObservableUpDownCounter = float64Observable{}
var _ metric.Float64ObservableGauge = float64Observable{}

func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []internal.Aggregator[float64]) float64Observable {
func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[float64]) float64Observable {
return float64Observable{
observable: newObservable(scope, kind, name, desc, u, agg),
}
Expand All @@ -273,7 +273,7 @@ var _ metric.Int64ObservableCounter = int64Observable{}
var _ metric.Int64ObservableUpDownCounter = int64Observable{}
var _ metric.Int64ObservableGauge = int64Observable{}

func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []internal.Aggregator[int64]) int64Observable {
func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[int64]) int64Observable {
return int64Observable{
observable: newObservable(scope, kind, name, desc, u, agg),
}
Expand All @@ -283,10 +283,10 @@ type observable[N int64 | float64] struct {
metric.Observable
observablID[N]

aggregators []internal.Aggregator[N]
aggregators []aggregate.Aggregator[N]
}

func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []internal.Aggregator[N]) *observable[N] {
func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[N]) *observable[N] {
return &observable[N]{
observablID: observablID[N]{
name: name,
Expand Down
18 changes: 9 additions & 9 deletions sdk/metric/instrument_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"testing"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

func BenchmarkInstrument(b *testing.B) {
Expand All @@ -32,10 +32,10 @@ func BenchmarkInstrument(b *testing.B) {
}

b.Run("instrumentImpl/aggregate", func(b *testing.B) {
inst := int64Inst{aggregators: []internal.Aggregator[int64]{
internal.NewLastValue[int64](),
internal.NewCumulativeSum[int64](true),
internal.NewDeltaSum[int64](true),
inst := int64Inst{aggregators: []aggregate.Aggregator[int64]{
aggregate.NewLastValue[int64](),
aggregate.NewCumulativeSum[int64](true),
aggregate.NewDeltaSum[int64](true),
}}
ctx := context.Background()

Expand All @@ -47,10 +47,10 @@ func BenchmarkInstrument(b *testing.B) {
})

b.Run("observable/observe", func(b *testing.B) {
o := observable[int64]{aggregators: []internal.Aggregator[int64]{
internal.NewLastValue[int64](),
internal.NewCumulativeSum[int64](true),
internal.NewDeltaSum[int64](true),
o := observable[int64]{aggregators: []aggregate.Aggregator[int64]{
aggregate.NewLastValue[int64](),
aggregate.NewCumulativeSum[int64](true),
aggregate.NewDeltaSum[int64](true),
}}

b.ReportAllocs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"time"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal
package aggregate

import (
"context"
Expand Down Expand Up @@ -105,7 +105,7 @@ func Example() {
_, _ = m.Int64Histogram("histogram example")

// Output:
// using *internal.cumulativeSum[int64] aggregator for counter
// using *internal.lastValue[int64] aggregator for up-down counter
// using *internal.deltaHistogram[int64] aggregator for histogram
// using *aggregate.cumulativeSum[int64] aggregator for counter
// using *aggregate.lastValue[int64] aggregator for up-down counter
// using *aggregate.deltaHistogram[int64] aggregator for histogram
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"strconv"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package internal provides types and functionality used to aggregate and
// Package aggregate provides aggregate types used compute aggregations and
// cycle the state of metric measurements made by the SDK. These types and
// functionality are meant only for internal SDK use.
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"go.opentelemetry.io/otel/attribute"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"sort"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"sort"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"sync"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"sync"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"testing"
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

var (
Expand Down Expand Up @@ -441,7 +441,7 @@ func newInt64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string,
return &int64InstProvider{scope: s, pipes: p, resolve: newResolver[int64](p, c)}
}

func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]internal.Aggregator[int64], error) {
func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Aggregator[int64], error) {
inst := Instrument{
Name: name,
Description: desc,
Expand Down Expand Up @@ -469,7 +469,7 @@ func newFloat64InstProvider(s instrumentation.Scope, p pipelines, c *cache[strin
return &float64InstProvider{scope: s, pipes: p, resolve: newResolver[float64](p, c)}
}

func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]internal.Aggregator[float64], error) {
func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Aggregator[float64], error) {
inst := Instrument{
Name: name,
Description: desc,
Expand Down
33 changes: 17 additions & 16 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
)
Expand Down Expand Up @@ -233,16 +234,16 @@ func newInserter[N int64 | float64](p *pipeline, vc *cache[string, streamID]) *i
//
// If an instrument is determined to use a Drop aggregation, that instrument is
// not inserted nor returned.
func (i *inserter[N]) Instrument(inst Instrument) ([]internal.Aggregator[N], error) {
func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Aggregator[N], error) {
var (
matched bool
aggs []internal.Aggregator[N]
aggs []aggregate.Aggregator[N]
)

errs := &multierror{wrapped: errCreatingAggregators}
// The cache will return the same Aggregator instance. Use this fact to
// compare pointer addresses to deduplicate Aggregators.
seen := make(map[internal.Aggregator[N]]struct{})
seen := make(map[aggregate.Aggregator[N]]struct{})
for _, v := range i.pipeline.views {
stream, match := v(inst)
if !match {
Expand Down Expand Up @@ -288,7 +289,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]internal.Aggregator[N], err

// aggVal is the cached value in an aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Aggregator aggregate.Aggregator[N]
Err error
}

Expand All @@ -305,7 +306,7 @@ type aggVal[N int64 | float64] struct {
//
// If the instrument defines an unknown or incompatible aggregation, an error
// is returned.
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (internal.Aggregator[N], error) {
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (aggregate.Aggregator[N], error) {
switch stream.Aggregation.(type) {
case nil, aggregation.Default:
// Undefined, nil, means to use the default from the reader.
Expand All @@ -332,7 +333,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
return aggVal[N]{nil, nil}
}
if stream.AttributeFilter != nil {
agg = internal.NewFilter(agg, stream.AttributeFilter)
agg = aggregate.NewFilter(agg, stream.AttributeFilter)
}

i.pipeline.addSync(scope, instrumentSync{
Expand Down Expand Up @@ -388,14 +389,14 @@ func (i *inserter[N]) streamID(kind InstrumentKind, stream Stream) streamID {
// aggregator returns a new Aggregator matching agg, kind, temporality, and
// monotonic. If the agg is unknown or temporality is invalid, an error is
// returned.
func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind InstrumentKind, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) {
func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind InstrumentKind, temporality metricdata.Temporality, monotonic bool) (aggregate.Aggregator[N], error) {
switch a := agg.(type) {
case aggregation.Default:
return i.aggregator(DefaultAggregationSelector(kind), kind, temporality, monotonic)
case aggregation.Drop:
return nil, nil
case aggregation.LastValue:
return internal.NewLastValue[N](), nil
return aggregate.NewLastValue[N](), nil
case aggregation.Sum:
switch kind {
case InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter:
Expand All @@ -404,28 +405,28 @@ func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind InstrumentKin
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/metrics/api.md#asynchronous-counter-creation
switch temporality {
case metricdata.CumulativeTemporality:
return internal.NewPrecomputedCumulativeSum[N](monotonic), nil
return aggregate.NewPrecomputedCumulativeSum[N](monotonic), nil
case metricdata.DeltaTemporality:
return internal.NewPrecomputedDeltaSum[N](monotonic), nil
return aggregate.NewPrecomputedDeltaSum[N](monotonic), nil
default:
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality)
}
}

switch temporality {
case metricdata.CumulativeTemporality:
return internal.NewCumulativeSum[N](monotonic), nil
return aggregate.NewCumulativeSum[N](monotonic), nil
case metricdata.DeltaTemporality:
return internal.NewDeltaSum[N](monotonic), nil
return aggregate.NewDeltaSum[N](monotonic), nil
default:
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality)
}
case aggregation.ExplicitBucketHistogram:
switch temporality {
case metricdata.CumulativeTemporality:
return internal.NewCumulativeHistogram[N](a), nil
return aggregate.NewCumulativeHistogram[N](a), nil
case metricdata.DeltaTemporality:
return internal.NewDeltaHistogram[N](a), nil
return aggregate.NewDeltaHistogram[N](a), nil
default:
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality)
}
Expand Down Expand Up @@ -536,8 +537,8 @@ func newResolver[N int64 | float64](p pipelines, vc *cache[string, streamID]) re

// Aggregators returns the Aggregators that must be updated by the instrument
// defined by key.
func (r resolver[N]) Aggregators(id Instrument) ([]internal.Aggregator[N], error) {
var aggs []internal.Aggregator[N]
func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Aggregator[N], error) {
var aggs []aggregate.Aggregator[N]

errs := &multierror{}
for _, i := range r.inserters {
Expand Down
Loading

0 comments on commit 10c3445

Please sign in to comment.