diff --git a/.chloggen/partialsuccess-is-success.yaml b/.chloggen/partialsuccess-is-success.yaml new file mode 100644 index 00000000000..e5278c93f9a --- /dev/null +++ b/.chloggen/partialsuccess-is-success.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otlpexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: PartialSuccess is treated as success, logged as warning. + +# One or more tracking issues or pull requests related to the change +issues: [9243] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/otlpexporter/go.mod b/exporter/otlpexporter/go.mod index fb92eb4743d..769bcfc49cd 100644 --- a/exporter/otlpexporter/go.mod +++ b/exporter/otlpexporter/go.mod @@ -19,6 +19,7 @@ require ( go.opentelemetry.io/otel/metric v1.22.0 go.opentelemetry.io/otel/trace v1.22.0 go.uber.org/goleak v1.3.0 + go.uber.org/zap v1.26.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 google.golang.org/grpc v1.60.1 google.golang.org/protobuf v1.32.0 @@ -79,7 +80,6 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.22.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.26.0 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index 2c60c39ee4a..5d826435e45 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -10,6 +10,7 @@ import ( "runtime" "time" + "go.uber.org/zap" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -97,7 +98,13 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { } partialSuccess := resp.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) { - return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: \"%s\" (%d rejected)", resp.PartialSuccess().ErrorMessage(), resp.PartialSuccess().RejectedSpans())) + // TODO: These should be counted as "rejected", similar to dropped items. + // https://github.com/open-telemetry/opentelemetry-collector/issues/9243 + // Note the logging key is chosen for consistency with ../exporterhelper. + e.settings.Logger.Warn("Partial success response", + zap.String("message", resp.PartialSuccess().ErrorMessage()), + zap.Int64("dropped_spans", resp.PartialSuccess().RejectedSpans()), + ) } return nil } @@ -110,7 +117,13 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro } partialSuccess := resp.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) { - return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: \"%s\" (%d rejected)", resp.PartialSuccess().ErrorMessage(), resp.PartialSuccess().RejectedDataPoints())) + // TODO: These should be counted as "rejected", similar to dropped items. + // https://github.com/open-telemetry/opentelemetry-collector/issues/9243 + // Note the logging key is chosen for consistency with ../exporterhelper. + e.settings.Logger.Warn("Partial success response", + zap.String("message", resp.PartialSuccess().ErrorMessage()), + zap.Int64("dropped_data_points", resp.PartialSuccess().RejectedDataPoints()), + ) } return nil } @@ -123,7 +136,13 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { } partialSuccess := resp.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) { - return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: \"%s\" (%d rejected)", resp.PartialSuccess().ErrorMessage(), resp.PartialSuccess().RejectedLogRecords())) + // TODO: These should be counted as "rejected", similar to dropped items. + // https://github.com/open-telemetry/opentelemetry-collector/issues/9243 + // Note the logging key is chosen for consistency with ../exporterhelper. + e.settings.Logger.Warn("Partial success response", + zap.String("message", resp.PartialSuccess().ErrorMessage()), + zap.Int64("dropped_log_records", resp.PartialSuccess().RejectedLogRecords()), + ) } return nil } diff --git a/exporter/otlpexporter/otlp_test.go b/exporter/otlpexporter/otlp_test.go index 419bb3a6467..055b00e6758 100644 --- a/exporter/otlpexporter/otlp_test.go +++ b/exporter/otlpexporter/otlp_test.go @@ -15,6 +15,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -247,6 +249,11 @@ func TestSendTraces(t *testing.T) { set := exportertest.NewNopCreateSettings() set.BuildInfo.Description = "Collector" set.BuildInfo.Version = "1.2.3test" + + // For testing the "Partial success" warning. + logger, observed := observer.New(zap.DebugLevel) + set.TelemetrySettings.Logger = zap.New(logger) + exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) require.NoError(t, err) require.NotNil(t, exp) @@ -310,7 +317,9 @@ func TestSendTraces(t *testing.T) { td = testdata.GenerateTraces(2) err = exp.ConsumeTraces(context.Background(), td) - assert.Error(t, err) + assert.NoError(t, err) + assert.Len(t, observed.FilterLevelExact(zap.WarnLevel).All(), 1) + assert.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "Partial success") } func TestSendTracesWhenEndpointHasHttpScheme(t *testing.T) { @@ -412,6 +421,11 @@ func TestSendMetrics(t *testing.T) { set := exportertest.NewNopCreateSettings() set.BuildInfo.Description = "Collector" set.BuildInfo.Version = "1.2.3test" + + // For testing the "Partial success" warning. + logger, observed := observer.New(zap.DebugLevel) + set.TelemetrySettings.Logger = zap.New(logger) + exp, err := factory.CreateMetricsExporter(context.Background(), set, cfg) require.NoError(t, err) require.NotNil(t, exp) @@ -484,7 +498,9 @@ func TestSendMetrics(t *testing.T) { // Send two metrics. md = testdata.GenerateMetrics(2) - assert.Error(t, exp.ConsumeMetrics(context.Background(), md)) + assert.NoError(t, exp.ConsumeMetrics(context.Background(), md)) + assert.Len(t, observed.FilterLevelExact(zap.WarnLevel).All(), 1) + assert.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "Partial success") } func TestSendTraceDataServerDownAndUp(t *testing.T) { @@ -699,6 +715,11 @@ func TestSendLogData(t *testing.T) { set := exportertest.NewNopCreateSettings() set.BuildInfo.Description = "Collector" set.BuildInfo.Version = "1.2.3test" + + // For testing the "Partial success" warning. + logger, observed := observer.New(zap.DebugLevel) + set.TelemetrySettings.Logger = zap.New(logger) + exp, err := factory.CreateLogsExporter(context.Background(), set, cfg) require.NoError(t, err) require.NotNil(t, exp) @@ -770,5 +791,7 @@ func TestSendLogData(t *testing.T) { ld = testdata.GenerateLogs(2) err = exp.ConsumeLogs(context.Background(), ld) - assert.Error(t, err) + assert.NoError(t, err) + assert.Len(t, observed.FilterLevelExact(zap.WarnLevel).All(), 1) + assert.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "Partial success") } diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index b118b72f2a3..2d67d1b40ef 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -92,7 +92,7 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { return consumererror.NewPermanent(err) } - return e.export(ctx, e.tracesURL, request, tracesPartialSuccessHandler) + return e.export(ctx, e.tracesURL, request, e.tracesPartialSuccessHandler) } func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { @@ -101,7 +101,7 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro if err != nil { return consumererror.NewPermanent(err) } - return e.export(ctx, e.metricsURL, request, metricsPartialSuccessHandler) + return e.export(ctx, e.metricsURL, request, e.metricsPartialSuccessHandler) } func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { @@ -111,7 +111,7 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { return consumererror.NewPermanent(err) } - return e.export(ctx, e.logsURL, request, logsPartialSuccessHandler) + return e.export(ctx, e.logsURL, request, e.logsPartialSuccessHandler) } func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) error { @@ -259,7 +259,7 @@ func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler par type partialSuccessHandler func(bytes []byte, contentType string) error -func tracesPartialSuccessHandler(protoBytes []byte, contentType string) error { +func (e *baseExporter) tracesPartialSuccessHandler(protoBytes []byte, contentType string) error { if contentType != protobufContentType { return nil } @@ -270,12 +270,17 @@ func tracesPartialSuccessHandler(protoBytes []byte, contentType string) error { } partialSuccess := exportResponse.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) { - return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedSpans())) + // TODO: These should be counted as "rejected", similar to dropped items. + // https://github.com/open-telemetry/opentelemetry-collector/issues/9243 + e.logger.Warn("partial success", + zap.String("message", exportResponse.PartialSuccess().ErrorMessage()), + zap.Int64("num_rejected", exportResponse.PartialSuccess().RejectedSpans()), + ) } return nil } -func metricsPartialSuccessHandler(protoBytes []byte, contentType string) error { +func (e *baseExporter) metricsPartialSuccessHandler(protoBytes []byte, contentType string) error { if contentType != protobufContentType { return nil } @@ -286,12 +291,17 @@ func metricsPartialSuccessHandler(protoBytes []byte, contentType string) error { } partialSuccess := exportResponse.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) { - return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedDataPoints())) + // TODO: These should be counted as "rejected", similar to dropped items. + // https://github.com/open-telemetry/opentelemetry-collector/issues/9243 + e.logger.Warn("partial success", + zap.String("message", exportResponse.PartialSuccess().ErrorMessage()), + zap.Int64("num_rejected", exportResponse.PartialSuccess().RejectedDataPoints()), + ) } return nil } -func logsPartialSuccessHandler(protoBytes []byte, contentType string) error { +func (e *baseExporter) logsPartialSuccessHandler(protoBytes []byte, contentType string) error { if contentType != protobufContentType { return nil } @@ -302,7 +312,12 @@ func logsPartialSuccessHandler(protoBytes []byte, contentType string) error { } partialSuccess := exportResponse.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) { - return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedLogRecords())) + // TODO: These should be counted as "rejected", similar to dropped items. + // https://github.com/open-telemetry/opentelemetry-collector/issues/9243 + e.logger.Warn("partial success", + zap.String("message", exportResponse.PartialSuccess().ErrorMessage()), + zap.Int64("num_rejected", exportResponse.PartialSuccess().RejectedLogRecords()), + ) } return nil } diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 578e6af3ace..a0010570288 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -17,6 +17,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" codes "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" @@ -343,21 +345,25 @@ func TestUserAgent(t *testing.T) { } func TestPartialSuccessInvalidBody(t *testing.T) { + cfg := createDefaultConfig() + set := exportertest.NewNopCreateSettings() + exp, err := newExporter(cfg, set) + require.NoError(t, err) invalidBodyCases := []struct { telemetryType string handler partialSuccessHandler }{ { telemetryType: "traces", - handler: tracesPartialSuccessHandler, + handler: exp.tracesPartialSuccessHandler, }, { telemetryType: "metrics", - handler: metricsPartialSuccessHandler, + handler: exp.metricsPartialSuccessHandler, }, { telemetryType: "logs", - handler: logsPartialSuccessHandler, + handler: exp.logsPartialSuccessHandler, }, } for _, tt := range invalidBodyCases { @@ -369,6 +375,10 @@ func TestPartialSuccessInvalidBody(t *testing.T) { } func TestPartialSuccessUnsupportedContentType(t *testing.T) { + cfg := createDefaultConfig() + set := exportertest.NewNopCreateSettings() + exp, err := newExporter(cfg, set) + require.NoError(t, err) unsupportedContentTypeCases := []struct { contentType string }{ @@ -388,11 +398,11 @@ func TestPartialSuccessUnsupportedContentType(t *testing.T) { var handler func(b []byte, contentType string) error switch telemetryType { case "logs": - handler = logsPartialSuccessHandler + handler = exp.logsPartialSuccessHandler case "metrics": - handler = metricsPartialSuccessHandler + handler = exp.metricsPartialSuccessHandler case "traces": - handler = tracesPartialSuccessHandler + handler = exp.tracesPartialSuccessHandler default: panic(telemetryType) } @@ -426,7 +436,12 @@ func TestPartialSuccess_logs(t *testing.T) { LogsEndpoint: fmt.Sprintf("%s/v1/logs", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{}, } - exp, err := createLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + set := exportertest.NewNopCreateSettings() + + logger, observed := observer.New(zap.DebugLevel) + set.TelemetrySettings.Logger = zap.New(logger) + + exp, err := createLogsExporter(context.Background(), set, cfg) require.NoError(t, err) // start the exporter @@ -439,10 +454,17 @@ func TestPartialSuccess_logs(t *testing.T) { // generate data logs := plog.NewLogs() err = exp.ConsumeLogs(context.Background(), logs) - require.Error(t, err) + require.NoError(t, err) + require.Len(t, observed.FilterLevelExact(zap.WarnLevel).All(), 1) + require.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "partial success") } func TestPartialResponse_missingHeaderButHasBody(t *testing.T) { + cfg := createDefaultConfig() + set := exportertest.NewNopCreateSettings() + exp, err := newExporter(cfg, set) + require.NoError(t, err) + response := ptraceotlp.NewExportResponse() partial := response.PartialSuccess() partial.SetErrorMessage("hello") @@ -457,11 +479,16 @@ func TestPartialResponse_missingHeaderButHasBody(t *testing.T) { "Content-Type": {"application/x-protobuf"}, }, } - err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) - assert.True(t, consumererror.IsPermanent(err)) + err = handlePartialSuccessResponse(resp, exp.tracesPartialSuccessHandler) + assert.NoError(t, err) } func TestPartialResponse_missingHeaderAndBody(t *testing.T) { + cfg := createDefaultConfig() + set := exportertest.NewNopCreateSettings() + exp, err := newExporter(cfg, set) + require.NoError(t, err) + resp := &http.Response{ // `-1` indicates a missing Content-Length header in the Go http standard library ContentLength: -1, @@ -470,21 +497,31 @@ func TestPartialResponse_missingHeaderAndBody(t *testing.T) { "Content-Type": {"application/x-protobuf"}, }, } - err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + err = handlePartialSuccessResponse(resp, exp.tracesPartialSuccessHandler) assert.Nil(t, err) } func TestPartialResponse_nonErrUnexpectedEOFError(t *testing.T) { + cfg := createDefaultConfig() + set := exportertest.NewNopCreateSettings() + exp, err := newExporter(cfg, set) + require.NoError(t, err) + resp := &http.Response{ // `-1` indicates a missing Content-Length header in the Go http standard library ContentLength: -1, Body: io.NopCloser(badReader{}), } - err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + err = handlePartialSuccessResponse(resp, exp.tracesPartialSuccessHandler) assert.Error(t, err) } func TestPartialSuccess_shortContentLengthHeader(t *testing.T) { + cfg := createDefaultConfig() + set := exportertest.NewNopCreateSettings() + exp, err := newExporter(cfg, set) + require.NoError(t, err) + response := ptraceotlp.NewExportResponse() partial := response.PartialSuccess() partial.SetErrorMessage("hello") @@ -498,11 +535,21 @@ func TestPartialSuccess_shortContentLengthHeader(t *testing.T) { "Content-Type": {"application/x-protobuf"}, }, } - err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + // For short content-length, a real error happens. + err = handlePartialSuccessResponse(resp, exp.tracesPartialSuccessHandler) assert.Error(t, err) } func TestPartialSuccess_longContentLengthHeader(t *testing.T) { + cfg := createDefaultConfig() + set := exportertest.NewNopCreateSettings() + + logger, observed := observer.New(zap.DebugLevel) + set.TelemetrySettings.Logger = zap.New(logger) + + exp, err := newExporter(cfg, set) + require.NoError(t, err) + response := ptraceotlp.NewExportResponse() partial := response.PartialSuccess() partial.SetErrorMessage("hello") @@ -516,11 +563,20 @@ func TestPartialSuccess_longContentLengthHeader(t *testing.T) { "Content-Type": {"application/x-protobuf"}, }, } - err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) - assert.Error(t, err) + // No real error happens for long content length, so the partial + // success is handled as success with a warning. + err = handlePartialSuccessResponse(resp, exp.tracesPartialSuccessHandler) + assert.NoError(t, err) + assert.Len(t, observed.FilterLevelExact(zap.WarnLevel).All(), 1) + assert.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "partial success") } func TestPartialSuccessInvalidResponseBody(t *testing.T) { + cfg := createDefaultConfig() + set := exportertest.NewNopCreateSettings() + exp, err := newExporter(cfg, set) + require.NoError(t, err) + resp := &http.Response{ Body: io.NopCloser(badReader{}), ContentLength: 100, @@ -528,7 +584,7 @@ func TestPartialSuccessInvalidResponseBody(t *testing.T) { "Content-Type": {protobufContentType}, }, } - err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + err = handlePartialSuccessResponse(resp, exp.tracesPartialSuccessHandler) assert.Error(t, err) } @@ -550,7 +606,10 @@ func TestPartialSuccess_traces(t *testing.T) { TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{}, } - exp, err := createTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + set := exportertest.NewNopCreateSettings() + logger, observed := observer.New(zap.DebugLevel) + set.TelemetrySettings.Logger = zap.New(logger) + exp, err := createTracesExporter(context.Background(), set, cfg) require.NoError(t, err) // start the exporter @@ -563,7 +622,9 @@ func TestPartialSuccess_traces(t *testing.T) { // generate data traces := ptrace.NewTraces() err = exp.ConsumeTraces(context.Background(), traces) - require.Error(t, err) + require.NoError(t, err) + require.Len(t, observed.FilterLevelExact(zap.WarnLevel).All(), 1) + require.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "partial success") } func TestPartialSuccess_metrics(t *testing.T) { @@ -584,7 +645,10 @@ func TestPartialSuccess_metrics(t *testing.T) { MetricsEndpoint: fmt.Sprintf("%s/v1/metrics", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{}, } - exp, err := createMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + set := exportertest.NewNopCreateSettings() + logger, observed := observer.New(zap.DebugLevel) + set.TelemetrySettings.Logger = zap.New(logger) + exp, err := createMetricsExporter(context.Background(), set, cfg) require.NoError(t, err) // start the exporter @@ -597,7 +661,9 @@ func TestPartialSuccess_metrics(t *testing.T) { // generate data metrics := pmetric.NewMetrics() err = exp.ConsumeMetrics(context.Background(), metrics) - require.Error(t, err) + require.NoError(t, err) + require.Len(t, observed.FilterLevelExact(zap.WarnLevel).All(), 1) + require.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "partial success") } func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) *httptest.Server {