Skip to content

Commit

Permalink
Merge branch 'main' into container-id-cgroup-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
hanyuancheung authored Dec 26, 2022
2 parents f34b099 + 69e44a3 commit 5eaaa5c
Show file tree
Hide file tree
Showing 27 changed files with 832 additions and 200 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,27 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Return a `Registration` from the `RegisterCallback` method of a `Meter` in the `go.opentelemetry.io/otel/metric` package.
This `Registration` can be used to unregister callbacks. (#3522)
- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524)

### Removed

- The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520)

### Changed

- Global error handler uses an atomic value instead of a mutex. (#3543)
- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524)
- Add `NewMetricProducer` to `go.opentelemetry.io/otel/bridge/opencensus`, which can be used to pass OpenCensus metrics to an OpenTelemetry Reader. (#3541)
- Global logger uses an atomic value instead of a mutex. (#3545)

### Deprecated

- The `NewMetricExporter` in `go.opentelemetry.io/otel/bridge/opencensus` is deprecated. Use `NewMetricProducer` instead. (#3541)

## [1.11.2/0.34.0] 2022-12-05

### Added
Expand Down
32 changes: 32 additions & 0 deletions bridge/opencensus/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

ocmetricdata "go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricexport"
"go.opencensus.io/metric/metricproducer"

"go.opentelemetry.io/otel"
internal "go.opentelemetry.io/otel/bridge/opencensus/internal/ocmetric"
Expand All @@ -33,6 +34,36 @@ import (

const scopeName = "go.opentelemetry.io/otel/bridge/opencensus"

type producer struct {
manager *metricproducer.Manager
}

// NewMetricProducer returns a metric.Producer that fetches metrics from
// OpenCensus.
func NewMetricProducer() metric.Producer {
return &producer{
manager: metricproducer.GlobalManager(),
}
}

func (p *producer) Produce(context.Context) ([]metricdata.ScopeMetrics, error) {
producers := p.manager.GetAll()
data := []*ocmetricdata.Metric{}
for _, ocProducer := range producers {
data = append(data, ocProducer.Read()...)
}
otelmetrics, err := internal.ConvertMetrics(data)
if len(otelmetrics) == 0 {
return nil, err
}
return []metricdata.ScopeMetrics{{
Scope: instrumentation.Scope{
Name: scopeName,
},
Metrics: otelmetrics,
}}, err
}

// exporter implements the OpenCensus metric Exporter interface using an
// OpenTelemetry base exporter.
type exporter struct {
Expand All @@ -42,6 +73,7 @@ type exporter struct {

// NewMetricExporter returns an OpenCensus exporter that exports to an
// OpenTelemetry (push) exporter.
// Deprecated: Use NewMetricProducer instead.
func NewMetricExporter(base metric.Exporter, res *resource.Resource) metricexport.Exporter {
return &exporter{base: base, res: res}
}
Expand Down
129 changes: 129 additions & 0 deletions bridge/opencensus/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/stretchr/testify/require"
ocmetricdata "go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
ocresource "go.opencensus.io/resource"

"go.opentelemetry.io/otel/attribute"
Expand All @@ -32,6 +33,134 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
)

func TestMetricProducer(t *testing.T) {
now := time.Now()
for _, tc := range []struct {
desc string
input []*ocmetricdata.Metric
expected []metricdata.ScopeMetrics
expectErr bool
}{
{
desc: "empty",
expected: nil,
},
{
desc: "success",
input: []*ocmetricdata.Metric{
{
Resource: &ocresource.Resource{
Labels: map[string]string{
"R1": "V1",
"R2": "V2",
},
},
TimeSeries: []*ocmetricdata.TimeSeries{
{
StartTime: now,
Points: []ocmetricdata.Point{
{Value: int64(123), Time: now},
},
},
},
},
},
expected: []metricdata.ScopeMetrics{{
Scope: instrumentation.Scope{
Name: scopeName,
},
Metrics: []metricdata.Metrics{
{
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(),
StartTime: now,
Time: now,
Value: 123,
},
},
},
},
},
}},
},
{
desc: "partial success",
input: []*ocmetricdata.Metric{
{
Descriptor: ocmetricdata.Descriptor{
Name: "foo.com/bad-point",
Description: "a bad type",
Unit: ocmetricdata.UnitDimensionless,
Type: ocmetricdata.TypeGaugeDistribution,
},
},
{
Resource: &ocresource.Resource{
Labels: map[string]string{
"R1": "V1",
"R2": "V2",
},
},
TimeSeries: []*ocmetricdata.TimeSeries{
{
StartTime: now,
Points: []ocmetricdata.Point{
{Value: int64(123), Time: now},
},
},
},
},
},
expected: []metricdata.ScopeMetrics{{
Scope: instrumentation.Scope{
Name: scopeName,
},
Metrics: []metricdata.Metrics{
{
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(),
StartTime: now,
Time: now,
Value: 123,
},
},
},
},
},
}},
expectErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
fakeProducer := &fakeOCProducer{metrics: tc.input}
metricproducer.GlobalManager().AddProducer(fakeProducer)
defer metricproducer.GlobalManager().DeleteProducer(fakeProducer)
output, err := NewMetricProducer().Produce(context.Background())
if tc.expectErr {
require.Error(t, err)
} else {
require.Nil(t, err)
}
require.Equal(t, len(output), len(tc.expected))
for i := range output {
metricdatatest.AssertEqual(t, tc.expected[i], output[i])
}
})
}
}

type fakeOCProducer struct {
metrics []*ocmetricdata.Metric
}

func (f *fakeOCProducer) Read() []*ocmetricdata.Metric {
return f.metrics
}

func TestPushMetricsExporter(t *testing.T) {
now := time.Now()
for _, tc := range []struct {
Expand Down
22 changes: 6 additions & 16 deletions example/opencensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

ocmetric "go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricexport"
"go.opencensus.io/metric/metricproducer"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
Expand All @@ -34,7 +33,6 @@ import (
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

Expand Down Expand Up @@ -103,20 +101,12 @@ func tracing(otExporter sdktrace.SpanExporter) {
// monitoring demonstrates creating an IntervalReader using the OpenTelemetry
// exporter to send metrics to the exporter by using either an OpenCensus
// registry or an OpenCensus view.
func monitoring(otExporter metric.Exporter) error {
log.Println("Using the OpenTelemetry stdoutmetric exporter to export OpenCensus metrics. This allows routing telemetry from both OpenTelemetry and OpenCensus to a single exporter.")
ocExporter := opencensus.NewMetricExporter(otExporter, resource.Default())
intervalReader, err := metricexport.NewIntervalReader(&metricexport.Reader{}, ocExporter)
if err != nil {
return fmt.Errorf("failed to create interval reader: %w", err)
}
intervalReader.ReportingInterval = 10 * time.Second
log.Println("Emitting metrics using OpenCensus APIs. These should be printed out using the OpenTelemetry stdoutmetric exporter.")
err = intervalReader.Start()
if err != nil {
return fmt.Errorf("failed to start interval reader: %w", err)
}
defer intervalReader.Stop()
func monitoring(exporter metric.Exporter) error {
log.Println("Adding the OpenCensus metric Producer to an OpenTelemetry Reader to export OpenCensus metrics using the OpenTelemetry stdout exporter.")
reader := metric.NewPeriodicReader(exporter)
// Register the OpenCensus metric Producer to add metrics from OpenCensus to the output.
reader.RegisterProducer(opencensus.NewMetricProducer())
metric.NewMeterProvider(metric.WithReader(reader))

log.Println("Registering a gauge metric using an OpenCensus registry.")
r := ocmetric.NewRegistry()
Expand Down
2 changes: 1 addition & 1 deletion example/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
_, err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
n := -10. + rand.Float64()*(90.) // [-10, 100)
gauge.Observe(ctx, n, attrs...)
})
Expand Down
25 changes: 12 additions & 13 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ package otel // import "go.opentelemetry.io/otel"
import (
"log"
"os"
"sync"
"sync/atomic"
"unsafe"
)

var (
Expand All @@ -34,28 +35,26 @@ var (
)

type delegator struct {
lock *sync.RWMutex
eh ErrorHandler
delegate unsafe.Pointer
}

func (d *delegator) Handle(err error) {
d.lock.RLock()
defer d.lock.RUnlock()
d.eh.Handle(err)
d.getDelegate().Handle(err)
}

func (d *delegator) getDelegate() ErrorHandler {
return *(*ErrorHandler)(atomic.LoadPointer(&d.delegate))
}

// setDelegate sets the ErrorHandler delegate.
func (d *delegator) setDelegate(eh ErrorHandler) {
d.lock.Lock()
defer d.lock.Unlock()
d.eh = eh
atomic.StorePointer(&d.delegate, unsafe.Pointer(&eh))
}

func defaultErrorHandler() *delegator {
return &delegator{
lock: &sync.RWMutex{},
eh: &errLogger{l: log.New(os.Stderr, "", log.LstdFlags)},
}
d := &delegator{}
d.setDelegate(&errLogger{l: log.New(os.Stderr, "", log.LstdFlags)})
return d
}

// errLogger logs errors if no delegate is set, otherwise they are delegated.
Expand Down
6 changes: 3 additions & 3 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type HandlerTestSuite struct {

func (s *HandlerTestSuite) SetupSuite() {
s.errCatcher = new(testErrCatcher)
s.origHandler = globalErrorHandler.eh
s.origHandler = globalErrorHandler.getDelegate()

globalErrorHandler.setDelegate(&errLogger{l: log.New(s.errCatcher, "", 0)})
}
Expand Down Expand Up @@ -111,12 +111,12 @@ func (s *HandlerTestSuite) TestAllowMultipleSets() {
secondary := &errLogger{l: log.New(notUsed, "", 0)}
SetErrorHandler(secondary)
s.Require().Same(GetErrorHandler(), globalErrorHandler, "set changed globalErrorHandler")
s.Require().Same(globalErrorHandler.eh, secondary, "new Handler not set")
s.Require().Same(globalErrorHandler.getDelegate(), secondary, "new Handler not set")

tertiary := &errLogger{l: log.New(notUsed, "", 0)}
SetErrorHandler(tertiary)
s.Require().Same(GetErrorHandler(), globalErrorHandler, "set changed globalErrorHandler")
s.Assert().Same(globalErrorHandler.eh, tertiary, "user Handler not overridden")
s.Assert().Same(globalErrorHandler.getDelegate(), tertiary, "user Handler not overridden")
}

func TestHandlerTestSuite(t *testing.T) {
Expand Down
Loading

0 comments on commit 5eaaa5c

Please sign in to comment.