Skip to content

Commit

Permalink
Experiment with an errors.Join-based implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
evan-bradley committed Jan 8, 2024
1 parent 019256d commit d417f44
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 224 deletions.
104 changes: 33 additions & 71 deletions consumer/consumererror/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,106 +19,68 @@ necessary for the Collector to act as a proxy for a backend, i.e. relay a status
code returned from a backend in a response to a system upstream from the
Collector.

## Creating an error
## Creating Errors

To create a new error, call the `consumererror.NewError` function with the error
you want to wrap and any options that add additional metadata. Errors are either
considered permanent or retryable, depending on whether the
`WithRetryable[Signal]` option is passed. A permanent error indicates that the
error will always occur for a given set of telemetry items and should be
dropped. The permanence of an error can be checked with
`consumererror.IsPermanent`.
Errors can be created through one of the three constructors provided:

To help communicate information about the error to the caller, options with
additional metadata may be included. While some of the options are semantically
mutually-exclusive and shouldn't be combined, any set of options can be used
together and the package will determine which option takes precedence.
- `consumererror.New[Signal]` for retryable errors.
- `consumererror.NewPartial` for errors where only some of the items failed.
- `consumererror.NewHTTPStatus` for errors resulting from an HTTP call with an error status code.
- `consumererror.NewGRPCStatus` for errors resulting from a gRPC call with an error status code.

### WithRejectedCount(rejected int)
Any error that is not retryable is considered to be a permanent error and will not be retried.

Include a count of the items that were permanently rejected (spans,
datapoints, log records, etc.). The caller should have a full count of rejected
items, so this option is only needed to indicate a partial success.
Errors can be joined by passing them to a call to `consumererror.New`.

When using this option in an exporter or other component dealing with mapping
non-pdata formats, the rejected count should be based on the count of pdata
items that failed.

### WithRetryable\[Signal\](pdata, ...RetryOption)

Indicate that a temporary condition is the cause of an error and that the
request should be retried with the default delay. Use of this option means that
an error is not considered permanent.

#### WithRetryDelay(delay time.Duration)

Indicate that the payload should be retried after a certain amount of time.

### WithHTTPStatus(int)

Annotate the error with an HTTP status code obtained from a response during
exporting.

### WithGRPCStatus(*status.Status)
## Other considerations

Annotate the error with a gRPC status code obtained from a response during
exporting.
To keep error analysis simple when looking at an error upstream in a pipeline,
the component closest to the source of an error or set of errors should make a
decision about the nature of the error. The following are a few places where
special considerations may need to be made.

## Reading from an error
### Fanouts

The `consumererror.Error` type supports the following methods. Each method has a
method signature like `(data, bool)` with the second value indicating whether
the option was passed during the error's creation.
Pipeline components that perform fanouts should determine for themselves the
precedence of errors they receive when multiple downstream components report an
error.

- `Count`: Get the count of rejected items.
- `Retryable[Signal]`: Gets the information necessary to retry the request for a
given signal.
- `ToHTTP`: Returns an integer representing the HTTP status code. If both
`WithHTTPStatus` and `WithGRPCStatus` were passed, the HTTP status is used
first and gRPC status second.
- `ToGRPC`: Returns a `*status.Status` object representing the status returned
from the gRPC server. If both `WithHTTPStatus` and `WithGRPCStatus` were
passed, the gRPC status is used first and HTTP status second.
### Signal conversion

## Other considerations
When converting between signals in a pipeline, it is expected that the component
performing the conversion should perform the translation necessary in the error
for any signal item counts.

### Asynchronous processing

Note that the use of any components that do asynchronous processing, for example
the batch processor, will cut off the upward flow of information at the
asynchronous component. This means that something like a network status code
cannot be propagated from an exporter to a receiver if the batch processor is
used in the pipeline.
Note that the use of any components that do asynchronous processing will cut off
the upward flow of information at the asynchronous component.

## Examples

Creating an error:

```golang
consumererror.NewError(
consumererror.WithRetryableTraces(
consumererror.New(
consumererror.NewTraces(
error,
traces,
consumererror.WithRetryDelay(2 * time.Duration)
),
consumererror.WithHTTPStatus(http.StatusTooManyRequests)
consumererror.NewHTTPStatus(error, http.StatusTooManyRequests)
)
```

Using an error:

```golang
err := nextConsumer(ctx, traces)
status := consumererror.StatusError{}
retry := consumererror.Traces{}

if cErr, ok := consumererror.As(err); ok {
code, ok := cErr.ToHTTP()
if ok {
statusCode = code
}

retry, ok := cErr.RetryableTraces()

if ok {
doRetry(retry)
}
if errors.As(err, &status) {
if status.HTTPStatus >= 500 && errors.As(err, &retry) {
doRetry(retry.Data())
}
}
```
167 changes: 30 additions & 137 deletions consumer/consumererror/consumererror.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,164 +5,57 @@ package consumererror // import "go.opentelemetry.io/collector/consumer/consumer

import (
"errors"
"time"
"fmt"

"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"google.golang.org/grpc/status"
)

type Error struct {
error
retryable bool
rejected int
delay time.Duration
hasTraces bool
traces ptrace.Traces
hasMetrics bool
metrics pmetric.Metrics
hasLogs bool
logs plog.Logs
httpStatus int
grpcStatus *status.Status
}

var noDataCount = -1

// ErrorOption allows annotating an Error with metadata.
type ErrorOption func(error *Error)

// NewConsumerError wraps an error that happened while consuming telemetry
// and adds metadata onto it to be passed back up the pipeline.
func NewConsumerError(origErr error, options ...ErrorOption) error {
// Create an Error with `new` to force heap allocation.
err := new(Error)
err.error = origErr
err.rejected = noDataCount

cErr := new(Error)
if errors.As(origErr, cErr) {
err.copyMetadata(cErr)
}

for _, option := range options {
option(err)
}

return err
}

func (e *Error) Error() string {
return e.error.Error()
}

// Unwrap returns the wrapped error for use by `errors.Is` and `errors.As`.
func (e *Error) Unwrap() error {
return e.error
func New(errs ...error) error {
return errors.Join(errs...)
}

func (e *Error) Rejected() int {
return e.rejected
}

func (e *Error) Delay() time.Duration {
return e.delay
}

func (e *Error) RetryableTraces() (ptrace.Traces, bool) {
return e.traces, e.hasTraces
}

func (e *Error) RetryableMetrics() (pmetric.Metrics, bool) {
return e.metrics, e.hasMetrics
}

func (e *Error) RetryableLogs() (plog.Logs, bool) {
return e.logs, e.hasLogs
}

func (e *Error) ToHTTP() int {
// todo: translate gRPC to HTTP status
return e.httpStatus
}

func (e *Error) ToGRPC() *status.Status {
// todo: translate HTTP to grPC status
return e.grpcStatus
type StatusError struct {
error
httpStatus *int
grpcStatus *status.Status
}

// IsPermanent checks if an error was wrapped with the NewPermanent function, which
// is used to indicate that a given error will always be returned in the case
// that its sources receives the same input.
func IsPermanent(err error) bool {
if err == nil {
return false
func (se *StatusError) Error() string {
if se.httpStatus != nil {
return fmt.Sprintf("HTTP Status %d", *se.httpStatus)
} else {
return fmt.Sprintf("gRPC Status %s", se.grpcStatus.Code().String())
}

cErr := Error{}
if errors.As(err, &cErr) {
return !cErr.retryable
}

return false
}

func (e *Error) copyMetadata(err *Error) {
e.rejected = err.rejected
e.httpStatus = err.httpStatus
e.grpcStatus = err.grpcStatus
}

func WithRejectedCount(count int) ErrorOption {
return func(err *Error) {
err.rejected = count
func (se *StatusError) HTTPStatus() int {
if se.httpStatus != nil {
return *se.httpStatus
}
}

func WithHTTPStatus(status int) ErrorOption {
return func(err *Error) {
err.httpStatus = status
}
// TODO Convert gRPC to HTTP
return 0
}

func WithGRPCStatus(status *status.Status) ErrorOption {
return func(err *Error) {
err.grpcStatus = status
func (se *StatusError) GRPCStatus() *status.Status {
if se.grpcStatus != nil {
return se.grpcStatus
}
}

type RetryOption func(err *Error)

func WithRetryableTraces(td ptrace.Traces, options ...RetryOption) ErrorOption {
return func(err *Error) {
err.traces = td
err.hasTraces = true
err.retryable = true
for _, option := range options {
option(err)
}
}
// TODO Convert HTTP to gRPC
return &status.Status{}
}

func WithRetryableMetrics(md pmetric.Metrics, options ...RetryOption) ErrorOption {
return func(err *Error) {
err.metrics = md
err.hasMetrics = true
err.retryable = true
for _, option := range options {
option(err)
}
func NewHTTPStatus(err error, code int) error {
return &StatusError{
error: err,
httpStatus: &code,
}
}

func WithRetryableLogs(ld plog.Logs, options ...RetryOption) ErrorOption {
return func(err *Error) {
err.logs = ld
err.hasLogs = true
err.retryable = true
for _, option := range options {
option(err)
}
func NewGRPCStatus(err error, status *status.Status) error {
return &StatusError{
error: err,
grpcStatus: status,
}
}
24 changes: 24 additions & 0 deletions consumer/consumererror/partial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror"

var noDataCount = -1

type Partial struct {
err error
count int
}

func NewPartial(err error, count int) error {
return Partial{err: err, count: count}
}

func (p Partial) Error() string {
return "Partial success: " + p.err.Error()
}

// Unwrap returns the wrapped error for functions Is and As in standard package errors.
func (p Partial) Unwrap() error {
return p.err
}
2 changes: 1 addition & 1 deletion consumer/consumererror/permanent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type permanent struct {
// NewPermanent wraps an error to indicate that it is a permanent error, i.e. an
// error that will be always returned if its source receives the same inputs.
func NewPermanent(err error) error {
return permanent{err: err}
return Partial{err: err}
}

func (p permanent) Error() string {
Expand Down
Loading

0 comments on commit d417f44

Please sign in to comment.