Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[consumer] Allow annotating consumer errors with metadata #9041

Open
wants to merge 45 commits into
base: main
Choose a base branch
from

Conversation

evan-bradley
Copy link
Contributor

Description:

Revival of #7439

This explores one possible way to allow adding metadata to errors returned from consumers. The goal here is to allow transmitting more data back up the pipeline if there is an error at some stage, with the goal of it being used by an upstream component, e.g. a component that will retry data, or a receiver that will propagate an error code back to the sender.

The current design eliminates the permanent/retryable error types in favor of a single error type that supports adding signal data to be retried. If no data is added to be retried, the error is considered permanent. Currently there is no distinction made between the signals for the sake of simplicity, the caller should know what signal is used when retrieving the retryable items from the error. Any options for retrying the data (e.g. a delay) are offered as options when adding data to retry.

The error type currently supports a few general metadata fields that are copied when a downstream error is wrapped:

  • Partial successes can be expressed by setting the number of rejected items.
  • gRPC and HTTP status codes can be set and translated between if necessary.

Link to tracking Issue:

Resolves #7047

cc @dmitryax

consumer/consumererror/consumererror.go Outdated Show resolved Hide resolved
consumer/consumererror/consumererror.go Outdated Show resolved Hide resolved
consumer/consumererror/consumererror.go Outdated Show resolved Hide resolved
@jmacd
Copy link
Contributor

jmacd commented Jan 10, 2024

Happy to see this added. As discussed in #9260, there is a potential to propagate backwards the information contained in PartialSuccess responses from OTLP exports.

I worry about the code complexity introduced to have "success error" responses, meaning error != nil but the interpretation being success. However, this is what it will take to back-propagate partial successes, we want callers to see success with metadata about the number of rejected points if possible. Great to see this, thanks @evan-bradley.

@jmacd
Copy link
Contributor

jmacd commented Jan 10, 2024

As discussed in open-telemetry/oteps#238, it would be useful for setting the correct otel.outcome label, for callers to have access to the underlying gRPC and/or HTTP error code. Thanks!

consumer/consumererror/consumererror.go Outdated Show resolved Hide resolved
consumer/consumererror/consumererror.go Outdated Show resolved Hide resolved
consumer/consumererror/partial.go Outdated Show resolved Hide resolved
@mx-psi mx-psi added needed-for-1.0 release:required-for-ga Must be resolved before GA release and removed needed-for-1.0 labels Feb 7, 2024
Copy link
Contributor

This PR was marked stale due to lack of activity. It will be closed in 14 days.

Copy link
Contributor

This PR was marked stale due to lack of activity. It will be closed in 14 days.

Copy link

codecov bot commented Apr 11, 2024

Codecov Report

Attention: Patch coverage is 95.45455% with 4 lines in your changes missing coverage. Please review.

Project coverage is 92.23%. Comparing base (5726563) to head (55ea049).

Files with missing lines Patch % Lines
...sumererror/internal/statusconversion/conversion.go 88.23% 4 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #9041      +/-   ##
==========================================
+ Coverage   92.22%   92.23%   +0.01%     
==========================================
  Files         409      411       +2     
  Lines       19134    19222      +88     
==========================================
+ Hits        17646    17730      +84     
- Misses       1127     1131       +4     
  Partials      361      361              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@evan-bradley evan-bradley marked this pull request as ready for review April 11, 2024 19:31
@evan-bradley evan-bradley requested review from a team and mx-psi April 11, 2024 19:31
consumer/consumererror/statuserrors.go Outdated Show resolved Hide resolved
consumer/consumererror/partial.go Outdated Show resolved Hide resolved
consumer/consumererror/README.md Outdated Show resolved Hide resolved
Copy link
Member

@bogdandrutu bogdandrutu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall feedback for me is that we just want to add more "things". I don't want to block this, but I also feel that we should make sure we add only the things we are 100% sure about it.

Comment on lines +65 to +70
- `WithRetry` and `WithPartial` are included together: Partial successes are
considered permanent errors in OTLP, which conflicts with making an error
retryable by including `WithRetry`. However, per our definition of what makes
a permanent error, this error has been marked as retryable, and therefore we
assume the component producing this error supports retyable partial success
errors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a real use-case in mind for this, what I want to cover in this section is that we can use the rules established here to handle seemingly invalid error combinations.

When in doubt leave it out. I want us to not add any API if we don't have a good use-case.

Copy link
Member

@bogdandrutu bogdandrutu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall:

  1. I really like the MultiErr (current Error) approach and the APIs related to that. It is a nice way to encapsulate fanout errors.
  2. I am not sure if the ErrorData is the right thing though. Is this only for "Transport" errors? I would not return that bool but instead have a enum or a way to say "IsHttp" or "isGrpc" then have "AsHttp" or "AsGrpc" name to be discussed.

// out by an upstream component by calling `Error.Data`.
//
// Experimental: This API is at the early stage of development and may change without backward compatibility
type ErrorData interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point of making it interface vs struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is that component authors can't instantiate it themselves. If we're not concerned about that, I don't see any issues making it a struct.

consumer/consumererror/error.go Outdated Show resolved Hide resolved
}

// ErrorOption allows annotating an Error with metadata.
type ErrorOption func(error *errorData)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not recommended to have a public type that wraps a private type. I think the best way to do this is to use the interface pattern, see

type FactoryOption interface {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I like this a bit better, thanks. Implemented.

errors []ErrorData
}

// ErrorData is intended to be used to encapsulate various information
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would call this "Error" and the other "Errors"(or ErrorSlice)?

Maybe "MultiErr" for the container? Not sure, but throwing ideas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed them to ErrorContainer (for the type that accumulates errors as they move upstream) and Error for the actual error with the data. What do you think?

// Data returns all the accumulated ErrorData errors
// emitted by components downstream in the pipeline.
// These can then be aggregated or worked with individually.
func (e *Error) Data() []ErrorData {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong I think, we should not return the internal slice since the caller can modify it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell, this creates a new slice instance where modifications don't affect the original slice.

I added a test here that seems to show this isn't an issue, is there anything else I should check for to ensure we don't pass out a mutable copy of the slice? https://github.com/open-telemetry/opentelemetry-collector/pull/9041/files#diff-8c40d97e3ff1a76b10c0c2b02a2cea669f1ffdffc2363cfdf86ba8489cb9f59cR92

evan-bradley and others added 2 commits August 9, 2024 08:05
Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com>
@evan-bradley
Copy link
Contributor Author

evan-bradley commented Aug 9, 2024

Overall feedback for me is that we just want to add more "things". I don't want to block this, but I also feel that we should make sure we add only the things we are 100% sure about it.

Right now we're only adding HTTP/gRPC status code propagation, everything else is marked as unimplemented and still in the design stage. Do you have concerns with HTTP/gRPC status codes, or the way they're communicated?

I really like the MultiErr (current Error) approach and the APIs related to that. It is a nice way to encapsulate fanout errors.

I played with that, but the issue I encountered is that we may want to read these error types at the receiver level, when there may be multiple fanouts in the pipeline. So you could see something like the following:

I don't see how we pull out the joined errors without traversing the whole error tree by repeatedly calling Unwrap. Having a container type to keep track of all the errors just seems simpler.

I see now, I mistook what you said for the Uber multierr package. Glad you like it.

I am not sure if the ErrorData is the right thing though. Is this only for "Transport" errors? I would not return that bool but instead have a enum or a way to say "IsHttp" or "isGrpc" then have "AsHttp" or "AsGrpc" name to be discussed.

This is meant to hold information related to anything the component would like to communicate upstream. Right now we're starting with HTTP and gRPC status codes, but we would like to add retry information, partial success information, etc. in the future. The goal is to start small and iterate as we design the additional options.

@bogdandrutu
Copy link
Member

This is meant to hold information related to anything the component would like to communicate upstream. Right now we're starting with HTTP and gRPC status codes, but we would like to add retry information, partial success information, etc. in the future. The goal is to start small and iterate as we design the additional options.

Can you please respond to the rest of the comments and see how the design of the "ErrorData" would look like?

@evan-bradley
Copy link
Contributor Author

Can you please respond to the rest of the comments and see how the design of the "ErrorData" would look like?

Sure thing. The other questions were based on the first question, so I wanted to make sure we were on the same page there if possible.

I am not sure if the ErrorData is the right thing though. Is this only for "Transport" errors? I would not return that bool but instead have a enum or a way to say "IsHttp" or "isGrpc" then have "AsHttp" or "AsGrpc" name to be discussed.

  1. Enum: We could do a list of enums, e.g. [IS_RETRYABLE, IS_GRPC], then expose this with a method like Types() []Type. Since an error can contain multiple error types this would have to be a list instead of just a single enum value.
  2. IsHTTP + AsHTTP: This essentially conveys the same information as HTTPStatus() (code int, isHTTP bool), so from a functionality standpoint it's not an issue. I would rename IsHTTP to HasHTTP though, since errors can contain multiple types of data. I'm not sure I see the benefit of retrieving this information across two methods, though, what's wrong with the boolean?

@bogdandrutu
Copy link
Member

bogdandrutu commented Aug 11, 2024

Enum: We could do a list of enums, e.g. [IS_RETRYABLE, IS_GRPC], then expose this with a method like Types() []Type. Since an error can contain multiple error types this would have to be a list instead of just a single enum value.
IsHTTP + AsHTTP: This essentially conveys the same information as HTTPStatus() (code int, isHTTP bool), so from a functionality standpoint it's not an issue. I would rename IsHTTP to HasHTTP though, since errors can contain multiple types of data. I'm not sure I see the benefit of retrieving this information across two methods, though, what's wrong with the boolean?

Before answering any of these questions, need to better understand how "correctly" (recommended, or whatever is the right term) chained consumers should error? For me to know the "right" design, I need to understand better the usage. Here are some examples, and we can debate on them what is the correct way (eventually document this). For these examples assume to use logs:

First Example

A -> B (drops logs with invalid trace identifiers) -> C -> D -> E (exports to Loki)

For a request with 10 logs:

  • B finds 2 "logs" that are not valid, for example "trace.id" is 34 hex characters.
  • E tries to export the logs and 2 out of the 8 remaining (assuming B dropped the 2 invalid logs) are reject with 400 http request.

Can you please tell me exactly which component should propagate/append errors and how?

Second Exmple

                       / -> E - > F
        / ->  C -> D -
A -> B -               \ -> G -> H
        \
         \ -> I -> J -> K

For a request with 10 logs:

  • F fails to send 4 logs with 429 http request.
  • H fails to send 2 logs with 400 http request.
  • D drops 2 logs because of invalid trace identifiers (trace.id too large).
  • K fails to send 1 logs with INVALID_ARGUMENT grpc status code.

Can you please tell me exactly which component should propagate/append errors and how?

@evan-bradley
Copy link
Contributor Author

evan-bradley commented Aug 26, 2024

Thank you for the in-depth examples and illustrations. Without going too deep into the implementation details of how this happens, here's how I see each example playing out. I'm assuming both pipelines are synchronous, as any asynchronous processing will cut error propagation at that point.

First example

A -> B (drops logs with invalid trace identifiers) -> C -> D -> E (exports to Loki)
  1. B notes that 2 items failed out of 10 total items.
  2. E takes the 400 response with 2/8 records failing and returns an error without any retry information indicating the response code and partial success response.
  3. The error is passed upstream and reaches B. Since B also experienced a partial success, it creates a new error object indicating a partial success and adds this to the error it received. We can discuss the exact mechanism for this separately, but for now I'm assuming that we have a single error object that is continually passed up the chain to aggregate individual errors like this.
  4. Somewhere between B recording its partial success and A receiving the error, partial success data is aggregated to indicate 4/10 records failed for this pipeline branch, along with any explanation messages from E or B. This is done by a common library shared by components.
  5. Assuming A is an OTLP receiver, it ignores the downstream code and returns a 200-equivalent status code with a partial success message indicating 4/10 records failed.

Second example

                       / -> E - > F
        / ->  C -> D -
A -> B -               \ -> G -> H
        \
         \ -> I -> J -> K
  1. D drops the two logs and notes this for later when it receives a response from its downstream consumer(s).
  2. F returns both partial success information with 4/8 logs failing, and returns that it received a 429 error code. If it considers this retryable, it returns retry information. We assume F is not an OTLP exporter since 429 codes and partial successes aren't compatible in OTLP.
  3. H returns an error indicating partial success with 2/8 logs failing, and a 400 error code without any retry information assuming it is using OTLP.
  4. D records its partial success information. An internal library could aggregate the top-level count as 6/10 logs failing, choosing the larger number of failed records for the top-level count. All three errors could be captured inside the OTLP partial success payload. All three errors are kept inside the error container in case an upstream receiver uses a protocol that communicates more detailed information about partial successes.
  5. K records that 1/10 logs failed to send and returns an error indicating the status code without retry information, assuming it communicates over OTLP.
  6. The error is returned to A. If it is configured (likely using a common library) to retry errors within the Collector, it retries the pipeline branch that goes to F since F requested a retry. The branches for H and K will not be retried. If it is configured to return the error to the caller and uses OTLP, it ignores the downstream code and returns a 200-equivalent status code with a partial success message indicating 6/10 failing logs (the highest number) with error information from D, F, H, and K inside the error body.

@TylerHelmuth
Copy link
Member

it retries the pipeline branch that goes to F since F requested a retry. The branches for H and K will not be retried.

How would receiver A do this? It doesn't have the ability when calling ConsumeLogs to specify which path the data should take.

My gut is that the receiver should not retry data if it receives any PartialSuccess or PermanentError data from the error.

@evan-bradley
Copy link
Contributor Author

How would receiver A do this? It doesn't have the ability when calling ConsumeLogs to specify which path the data should take.

The necessary pipeline topology information would be contained within the error container and would be passed in the context.Context object when the receiver calls the ConsumeLogs function. This would be baked into a shared library so each receiver doesn't have to implement this logic.

My gut is that the receiver should not retry data if it receives any PartialSuccess or PermanentError data from the error.

I think it's ultimately up to the exporter to determine what combinations of options it supports. It's possible that exporter F uses a protocol where the backend supports idempotent submissions and retryable partial successes are possible.

That said, if receiver A is an OTLP receiver and has been configured to pass retry information upstream from the Collector, then it will need to determine whether to retry or return a partial success message since these are mutually exclusive in OTLP. In this case, not retrying seems like the safest course of action, though there could be some nuances here we could discuss in the future.

@sfc-gh-bdrutu
Copy link

The necessary pipeline topology information would be contained within the error container and would be passed in the context.Context object when the receiver calls the ConsumeLogs function. This would be baked into a shared library so each receiver doesn't have to implement this logic.

If we want to support retries, let's actually configure the "retries" in the components that support them and potentially add more components that can do retries. If you want crazy ideas, let's push them with the request vs supporting dynamic routing.

@evan-bradley
Copy link
Contributor Author

evan-bradley commented Aug 27, 2024

If we want to support retries, let's actually configure the "retries" in the components that support them and potentially add more components that can do retries. If you want crazy ideas, let's push them with the request vs supporting dynamic routing.

In general I would agree that retries should be done within the component that needs to do retries, but the goal here is more so to configure scraping receivers that aren't responding to a request. For example, the filelog receiver could throttle reading based on throttling information from downstream endpoints: open-telemetry/opentelemetry-collector-contrib#20864. The reason we need to have routing like this is to make sure we don't replay requests on exporters that have had successful submissions while retrying exporters that want a retry.

cc @dmitryax since you have the most context here.

@evan-bradley
Copy link
Contributor Author

@bogdandrutu As discussed offline, here are some code snippets showing roughly how the above examples would work using something similar to the proposed API in this PR. I've tried to keep them as short as possible while still being nearly-runnable code. Note that a lot of this still hasn't been implemented within this PR.

Example 1:

// A -> B (drops logs with invalid trace identifiers) -> C -> D -> E (exports to Loki)

// Shared library consumption function akin to consumerretry, fanoutconsumer, etc.
func ConsumeLogs(ctx context.Context, ld plog.Logs) error {
	err := processorB(ctx, ld)

	cerr := consumererror.ErrorContainer{}

	if errors.As(err, &cerr) {
		errs := cerr.Errors()

		partialCount := 0

		for _, e := range errs {
			count, ok := e.Partial()

			if ok {
				partialCount = partialCount + count
			}
		}

		if partialCount > 0 {
			cerr.SetPartialSuccessAggregate(partialCount)
		}
	}
}

func receiverA(ctx context.Context, ld plog.Logs) error {
	err := ConsumeLogs(ctx, ld)

	cerr := consumererror.ErrorContainer{}

	if errors.As(err, &cerr) {
		if cerr.IsPartialSuccess() {
			// Ignore 400 error code because OTLP partial successes use 200 status codes
			return http.Response{code: 200, body: fmt.Sprintf("Partial success: %d out of %d logs failed", cerr.PartialCount(), ld.LogRecordCount())}
		}
	}
}

func processorB(ctx context.Context, ld plog.Logs) error {
	failedItems := 2
	processorErr := consumererror.New(
		errors.New("Logs had trace IDs that failed validation"),
		consumererror.WithPartial(failedItems)
	)

	err := exporterE(ctx, ld)

	if err != nil {
		return consumererror.Combine(err, processorErr)
	}

	return processorErr
}

func exporterE(ctx context.Context, ld plog.Logs) error {
	return consumererror.New(err,
		consumererror.WithPartial(2)
		consumererror.WithHTTPStatus(400),
	  )
}

Example 2:

//                        / -> E - > F
//         / ->  C -> D -
// A -> B -               \ -> G -> H
//         \
//          \ -> I -> J -> K

func receiverA(ctx context.Context, ld plog.Logs) error {
	err := processorB(ctx, ld)

	cerr := consumererror.ErrorContainer{}

	if errors.As(err, &cerr) {
		if cerr.ShouldRetry() {
			ctx := ctx.WithValue("otelcol-retry-info", cerr.Retry())	
			for i := 0; i < maxRetries; i++ {
				err := processorB(ctx, ld)
				if err != nil {
					// check error and continue with retry
				}

				return nil
			}

			return http.Response{code: 400, body: "could not submit data"}
		} else {
			return http.Response{code: cerr.HttpStatusCode(), body: fmt.Sprintf("Partial success: %d out of %d logs failed", cerr.PartialCount(), ld.LogRecordCount())}
		}
	}
}

func fanoutConsumerB(ctx context.Context, ld plog.Logs) error {
	exporters := []consumer.ConsumeLogsFunc{processorD, exporterK}
	var allErrs error
	for _, e := range exporters {
		err := e(ctx, ld)

		if err != nil {
			allErrs := consumererror.Combine(allErrs, err)
		}
	}

	// TODO: Aggregate partial success counts if present

	return allErrs
}

func processorB(ctx context.Context, ld plog.Logs) error {
	return fanoutConsumerB(ctx, ld)
}

func fanoutConsumerD(ctx context.Context, ld plog.Logs) error {
	exporters := []consumer.ConsumeLogsFunc{exporterF, exporterH}
	var allErrs error
	for _, e := range exporters {
		err := e(ctx, ld)

		if err != nil {
			allErrs := consumererror.Combine(allErrs, err)
		}
	}

	// TODO: Aggregate partial success counts if present

	return allErrs
}

func processorD(ctx context.Context, ld plog.Logs) error {
	failedItems := 2
	processorErr := consumererror.New(
		errors.New("Logs had trace IDs that failed validation"),
		consumererror.WithPartial(failedItems)
	)


	err := fanoutConsumerD(ctx, ld)

	if err != nil {
		return consumererror.Combine(err, processorErr)
	}

	return processorErr
}

func exporterF(ctx context.Context, ld plog.Logs) error {
	return consumererror.New(err,
		consumererror.WithPartial(4)
		consumererror.WithHTTPStatus(429),
		consumererror.WithRetry(
			consumerrerror.WithRetryDelay(10 * time.Second),
		),
	  )
}

func exporterH(ctx context.Context, ld plog.Logs) error {
	return consumererror.New(err,
		consumererror.WithPartial(2)
		consumererror.WithHTTPStatus(400),
	  )
}

func exporterK(ctx context.Context, ld plog.Logs) error {
	return consumererror.New(err,
		consumererror.WithPartial(1)
		consumererror.WithGRPCtatus(codes.InvalidArgument),
	  )
}

@evan-bradley
Copy link
Contributor Author

We had a meeting on 2024-08-27 with the following outcomes:

  • I will merge the ErrorContainer and Error types into a single Error type from the user's perspective so both reads and writes occur on the same type.
  • Aggregating things like status codes and partial success counts needs to be accounted for early on, since components reading this data only want a single value.
  • Retries are a complex issue that we need to carefully consider before moving forward. We will still be leaving them out of this initial version, but the APIs may change if we determine that we don't need some of the sophistication required for functionality like retrying individual pipeline branches.

Comment on lines 93 to 98
consumererror.New(err,
consumererror.WithRetry(
consumerrerror.WithRetryDelay(10 * time.Second)
),
consumererror.WithGRPCStatus(codes.InvalidArgument),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example must be invalid. codes.InvalidArgument should not be retryable.

In general, it still seems like all these options provide too much flexibility, making it confusing to use and easy to misuse.

For example, why GRPC and HTTP have to be options? Why can't we just have different constructors like consumererror.NewFromGRPC(grpc.Status)? The status should have the retry information that we can retrieve if applicable. If we expect exporters to add extra info on top of grpc error, they can manually parse the grpc status and create a collector "internal" error another way.

Copy link
Contributor Author

@evan-bradley evan-bradley Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of an intentionally unusual option combination since we describe above how combinations that are invalid in OTLP are handled. I'll move this and change it to a more normal combination so we're not highlighting anything we wouldn't recommend. It is worth noting that this is only non-retryable per the OTLP spec and other protocols could consider this a retryable code.

I still think options are the best path forward even if they allow states that are not valid in OTLP, but I'm open to exploring different approaches. A few questions on how we would transition from HTTP/gRPC options to constructors:

Why can't we just have different constructors like consumererror.NewFromGRPC(grpc.Status)? The status should have the retry information that we can retrieve if applicable.

Wouldn't this still be susceptible to the same issue, where an exporter creates a gRPC status with both codes.InvalidArgument and retryable information? Also, where would this information be given when using an HTTP constructor like consumer.NewFromHTTP?

If we expect exporters to add extra info on top of grpc error, they can manually parse the grpc status and create a collector "internal" error another way.

Could you give an example of how we would do this?

@TylerHelmuth
Copy link
Member

Discussion on this PR should be moved to #11085

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release:required-for-ga Must be resolved before GA release
Projects
Status: Waiting for reviews
Development

Successfully merging this pull request may close these issues.

Investigate how to expose exporterhelper.NewThrottleRetry in the consumererror