Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify compliant metric SDK specification implementation: MetricReader/MetricReader operations/Collect #3662

Closed
2 tasks done
Tracked by #3674
MrAlias opened this issue Feb 3, 2023 · 13 comments
Closed
2 tasks done
Tracked by #3674
Assignees
Labels
area:metrics Part of OpenTelemetry Metrics pkg:SDK Related to an SDK package

Comments

@MrAlias
Copy link
Contributor

MrAlias commented Feb 3, 2023

  • Identify all the normative requirements, recommendations, and options the specification defines as comments to this issue
  • Ensure the current metric SDK implementation is compliant with these normative requirements, recommendations, and options in those comments.
@MrAlias MrAlias converted this from a draft issue Feb 3, 2023
@MrAlias MrAlias added pkg:SDK Related to an SDK package area:metrics Part of OpenTelemetry Metrics labels Feb 3, 2023
@dashpole
Copy link
Contributor

I would like to take this.

@dashpole dashpole self-assigned this Jul 21, 2023
@dashpole dashpole moved this from Todo to In Progress in Go: Metric SDK (GA) Jul 21, 2023
@dashpole
Copy link
Contributor

MetricReader is an SDK implementation object that provides the common configurable aspects of the OpenTelemetry Metrics SDK and determines the following capabilities:

// RegisterProducer registers a an external Producer with this Reader.
// The Producer is used as a source of aggregated metric data which is
// incorporated into metrics collected from the SDK.
//
// This method needs to be concurrent safe.
RegisterProducer(Producer)

  • Collecting metrics from the SDK and any registered MetricProducers on demand.

// Collect gathers and returns all metric data related to the Reader from
// the SDK and stores it in out. An error is returned if this is called
// after Shutdown or if out is nil.
//
// This method needs to be concurrent safe, and the cancelation of the
// passed context is expected to be honored.
Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error

// ForceFlush flushes all metric measurements held in an export pipeline.
//
// This deadline or cancellation of the passed context are honored. An appropriate
// error will be returned in these situations. There is no guaranteed that all
// telemetry be flushed or all resources have been released in these
// situations.
//
// This method needs to be concurrent safe.
ForceFlush(context.Context) error
// Shutdown flushes all metric measurements held in an export pipeline and releases any
// held computational resources.
//
// This deadline or cancellation of the passed context are honored. An appropriate
// error will be returned in these situations. There is no guaranteed that all
// telemetry be flushed or all resources have been released in these
// situations.
//
// After Shutdown is called, calls to Collect will perform no operation and instead will return
// an error indicating the shutdown state.
//
// This method needs to be concurrent safe.
Shutdown(context.Context) error

@dashpole
Copy link
Contributor

dashpole commented Jul 21, 2023

To construct a MetricReader when setting up an SDK, the caller SHOULD provide at least the following:

  • The exporter to use, which is a MetricExporter instance.

This option is ONLY exposed on periodic readers.

func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *PeriodicReader {

  • The default output aggregation (optional), a function of instrument kind. If not configured, the default aggregation SHOULD be used.

This option is only provided on ManualReaders:

// WithAggregationSelector sets the AggregationSelector a reader will use to
// determine the aggregation to use for an instrument based on its kind. If
// this option is not used, the reader will use the DefaultAggregationSelector
// or the aggregation explicitly passed for a view matching an instrument.
func WithAggregationSelector(selector AggregationSelector) ManualReaderOption {

For PeriodicReaders, the MetricExporter provides the default output aggregation.

  • The default output temporality (optional), a function of instrument kind. If not configured, the Cumulative temporality SHOULD be used.

This option is only provided for ManualReaders:

// WithTemporalitySelector sets the TemporalitySelector a reader will use to
// determine the Temporality of an instrument based on its kind. If this
// option is not used, the reader will use the DefaultTemporalitySelector.
func WithTemporalitySelector(selector TemporalitySelector) ManualReaderOption {
return temporalitySelectorOption{selector: selector}
}

For PeriodicReaders, the MetricExporter provides the default output temporality.

  • The default aggregation cardinality limit to use, a function of instrument kind. If not configured, a default value of 2000 SHOULD be used.

This option is not provided when constructing a MetricReader.
IMO, this should not be part of the stable specification.

@dashpole
Copy link
Contributor

dashpole commented Jul 21, 2023

A common sub-class of MetricReader, the periodic exporting MetricReader SHOULD be provided to be used typically with push-based metrics collection.

We provide a PeriodicReader:

func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *PeriodicReader {

@dashpole
Copy link
Contributor

dashpole commented Jul 21, 2023

The MetricReader MUST ensure that data points from OpenTelemetry instruments are output in the configured aggregation temporality for each instrument kind. For synchronous instruments being output with Cumulative temporality, this means converting Delta to Cumulative aggregation temporality. For asynchronous instruments being output with Delta temporality, this means converting Cumulative to Delta aggregation temporality.

The periodic reader defaults to the exporter's temporality:

// temporality reports the Temporality for the instrument kind provided.
func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.exporter.Temporality(kind)

The manual reader defaults to the configured temporality:

// temporality reports the Temporality for the instrument kind provided.
func (mr *ManualReader) temporality(kind InstrumentKind) metricdata.Temporality {
return mr.temporalitySelector(kind)
}

The collection + aggregation pipeline uses the temporality of the reader:

b := aggregate.Builder[N]{
Temporality: i.pipeline.reader.temporality(kind),

That temporality is used to ensure that the aggregation is output in the correct temporality:

switch b.Temporality {
case metricdata.DeltaTemporality:
s = newPrecomputedDeltaSum[N](monotonic)
default:
s = newPrecomputedCumulativeSum[N](monotonic)

I believe that satisfies the "MUST ensure that data points ... are output in the configured aggregation temporality", even though much of the implementation details are not implemented within our Readers.

@dashpole
Copy link
Contributor

The SDK MUST support multiple MetricReader instances to be registered on the same MeterProvider, and the MetricReader.Collect invocation on one MetricReader instance SHOULD NOT introduce side-effects to other MetricReader instances.

Multiple readers can be registered on a single MeterProvider (readers are appended to a list):

// WithReader associates Reader r with a MeterProvider.
//
// By default, if this option is not used, the MeterProvider will perform no
// operations; no data will be exported without a Reader.
func WithReader(r Reader) Option {
return optionFunc(func(cfg config) config {
if r == nil {
return cfg
}
cfg.readers = append(cfg.readers, r)
return cfg

Each reader results in an independent pipeline:

func newPipelines(res *resource.Resource, readers []Reader, views []View) pipelines {
pipes := make([]*pipeline, 0, len(readers))
for _, r := range readers {
p := newPipeline(res, r, views)
r.register(p)
pipes = append(pipes, p)
}
return pipes

Invoking Collect on a reader only calls produce on its own pipeline. Periodic:

err := ph.produce(ctx, rm)

and Manual:
err := ph.produce(ctx, rm)

produce only invokes callbacks and computes aggregations from the pipeline's own, independent callbacks and aggregations:

func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error {
p.Lock()
defer p.Unlock()
var errs multierror
for _, c := range p.callbacks {
// TODO make the callbacks parallel. ( #3034 )
if err := c(ctx); err != nil {
errs.append(err)
}
if err := ctx.Err(); err != nil {
rm.Resource = nil
rm.ScopeMetrics = rm.ScopeMetrics[:0]
return err
}
}
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
// TODO make the callbacks parallel. ( #3034 )
f := e.Value.(multiCallback)
if err := f(ctx); err != nil {
errs.append(err)
}
if err := ctx.Err(); err != nil {
// This means the context expired before we finished running callbacks.
rm.Resource = nil
rm.ScopeMetrics = rm.ScopeMetrics[:0]
return err
}
}
rm.Resource = p.resource
rm.ScopeMetrics = internal.ReuseSlice(rm.ScopeMetrics, len(p.aggregations))
i := 0
for scope, instruments := range p.aggregations {
rm.ScopeMetrics[i].Metrics = internal.ReuseSlice(rm.ScopeMetrics[i].Metrics, len(instruments))
j := 0
for _, inst := range instruments {
data := rm.ScopeMetrics[i].Metrics[j].Data
if n := inst.compAgg(&data); n > 0 {
rm.ScopeMetrics[i].Metrics[j].Name = inst.name
rm.ScopeMetrics[i].Metrics[j].Description = inst.description
rm.ScopeMetrics[i].Metrics[j].Unit = inst.unit
rm.ScopeMetrics[i].Metrics[j].Data = data
j++
}
}
rm.ScopeMetrics[i].Metrics = rm.ScopeMetrics[i].Metrics[:j]
if len(rm.ScopeMetrics[i].Metrics) > 0 {
rm.ScopeMetrics[i].Scope = scope
i++
}
}
rm.ScopeMetrics = rm.ScopeMetrics[:i]
return errs.errorOrNil()

@dashpole
Copy link
Contributor

The SDK MUST NOT allow a MetricReader instance to be registered on more than one MeterProvider instance.

We return this error when a reader is registered more than once (e.g. with different MeterProviders):

var errDuplicateRegister = fmt.Errorf("duplicate reader registration")

Periodic Reader impl:

// register registers p as the producer of this reader.
func (r *PeriodicReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register periodic reader"
global.Error(errDuplicateRegister, msg)
}
}

Manual Reader impl:

// register stores the sdkProducer which enables the caller
// to read metrics from the SDK on demand.
func (mr *ManualReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register manual reader"
global.Error(errDuplicateRegister, msg)
}
}

@dashpole
Copy link
Contributor

The SDK SHOULD provide a way to allow MetricReader to respond to MeterProvider.ForceFlush and MeterProvider.Shutdown.

The ForceFlush and Shutdown signals of readers are aggregated when the MeterProvider is created:

// readerSignals returns a force-flush and shutdown function for a
// MeterProvider to call in their respective options. All Readers c contains
// will have their force-flush and shutdown methods unified into returned
// single functions.
func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) {
var fFuncs, sFuncs []func(context.Context) error
for _, r := range c.readers {
sFuncs = append(sFuncs, r.Shutdown)
fFuncs = append(fFuncs, r.ForceFlush)
}
return unify(fFuncs), unifyShutdown(sFuncs)

func NewMeterProvider(options ...Option) *MeterProvider {
conf := newConfig(options)
flush, sdown := conf.readerSignals()
mp := &MeterProvider{
pipes: newPipelines(conf.res, conf.readers, conf.views),
forceFlush: flush,
shutdown: sdown,

And then are invoked when ForceFlush and Shutdown are invoked on the MeterProvider:

func (mp *MeterProvider) ForceFlush(ctx context.Context) error {
if mp.forceFlush != nil {
return mp.forceFlush(ctx)
}
return nil

func (mp *MeterProvider) Shutdown(ctx context.Context) error {
// Even though it may seem like there is a synchronization issue between the
// call to `Store` and checking `shutdown`, the Go concurrency model ensures
// that is not the case, as all the atomic operations executed in a program
// behave as though executed in some sequentially consistent order. This
// definition provides the same semantics as C++'s sequentially consistent
// atomics and Java's volatile variables.
// See https://go.dev/ref/mem#atomic and https://pkg.go.dev/sync/atomic.
mp.stopped.Store(true)
if mp.shutdown != nil {
return mp.shutdown(ctx)
}
return nil
}

@dashpole
Copy link
Contributor

MetricReader Operations:

Collect

func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {

func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {

Collects the metrics from the SDK and any registered MetricProducers.

From SDK:

err := ph.produce(ctx, rm)

err := ph.produce(ctx, rm)

From Producers:

for _, producer := range mr.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}

for _, producer := range r.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}

Collect SHOULD provide a way to let the caller know whether it succeeded, failed or timed out.

Collect returns an error. See signature above.

When the Collect operation fails or times out on some of the instruments, the SDK MAY return successfully collected results and a failed reasons list to the caller.

We support partial results by aggregating errors and continuing on during callbacks that return errors:

if err := c(ctx); err != nil {
errs.append(err)
}

if err := f(ctx); err != nil {
errs.append(err)
}

@dashpole
Copy link
Contributor

Shutdown

// Shutdown flushes all metric measurements held in an export pipeline and releases any
// held computational resources.
//
// This deadline or cancellation of the passed context are honored. An appropriate
// error will be returned in these situations. There is no guaranteed that all
// telemetry be flushed or all resources have been released in these
// situations.
//
// After Shutdown is called, calls to Collect will perform no operation and instead will return
// an error indicating the shutdown state.
//
// This method needs to be concurrent safe.
Shutdown(context.Context) error

Shutdown MUST be called only once for each MetricReader instance. After the call to Shutdown, subsequent invocations to Collect are not allowed. SDKs SHOULD return some failure for these calls, if possible.

In Shutdown(), readers store a shutdownProducer in the produer, which returns an error that is returned to the caller of Collect:

// Any future call to Collect will now return ErrReaderShutdown.
mr.sdkProducer.Store(produceHolder{
produce: shutdownProducer{}.produce,
})

// Any future call to Collect will now return ErrReaderShutdown.
ph := r.sdkProducer.Swap(produceHolder{
produce: shutdownProducer{}.produce,
})

Shutdown SHOULD provide a way to let the caller know whether it succeeded, failed or timed out.

Shutdown returns an error.

Shutdown SHOULD complete or abort within some timeout.

Shutdown does not impose a timeout on collection or exporting of telemetry in the periodic reader

It does accept a context, which allows users to easily place a timeout on collection or export.

@dashpole
Copy link
Contributor

That is all for the MetricReader, MetricReader operations, Collect, and Shutdown sections. The bolded text above are contentious, or are areas where we differ from the specification. Our only differences occur in SHOULD sections.

@dashpole
Copy link
Contributor

I think I misread the intention of this issue... If this is only about verifying collect, see #3662 (comment)

I did not find any issues regarding the Collect function..

@MrAlias
Copy link
Contributor Author

MrAlias commented Jul 27, 2023

The targeted material has all been identified and checked to be compliant. Closing.

@MrAlias MrAlias closed this as completed Jul 27, 2023
@github-project-automation github-project-automation bot moved this from In Progress to Done in Go: Metric SDK (GA) Jul 27, 2023
@MrAlias MrAlias added this to the v1.17.0/v0.40.0 milestone Aug 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:metrics Part of OpenTelemetry Metrics pkg:SDK Related to an SDK package
Projects
No open projects
Development

No branches or pull requests

2 participants