From 612fb2defb8495b1c81278097e64c273b9218513 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 1 Aug 2022 16:52:50 +0800 Subject: [PATCH] Adjust code to opentelemetry-collector changes Mostly just adjusting to API changes, but there are some functional changes related to Jaeger span events. https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/10273 uncovered an issue where we were recording the "event" tag as a label, when we should have been recording it as the "message" field in some circumstances. --- .../receiver/otlpreceiver/mixin.go | 21 +- internal/beater/jaeger/grpc.go | 5 +- internal/beater/jaeger/grpc_test.go | 17 +- internal/beater/otlp/grpc_test.go | 38 +- internal/beater/otlp/http_test.go | 38 +- internal/model/modeldecoder/v2/decoder.go | 55 +- internal/processor/otel/exceptions_test.go | 25 +- internal/processor/otel/logs.go | 29 +- internal/processor/otel/logs_test.go | 39 +- internal/processor/otel/metadata.go | 22 +- internal/processor/otel/metadata_test.go | 106 ++-- internal/processor/otel/metrics.go | 94 +-- internal/processor/otel/metrics_test.go | 244 ++++---- .../span_jaeger_http.approved.json | 2 +- .../transaction_jaeger_full.approved.json | 2 +- internal/processor/otel/timestamps.go | 4 +- internal/processor/otel/traces.go | 153 ++--- internal/processor/otel/traces_test.go | 554 +++++++++--------- testdata/jaeger/batch_0.approved.json | 6 +- testdata/jaeger/batch_1.approved.json | 2 +- 20 files changed, 721 insertions(+), 735 deletions(-) diff --git a/internal/.otel_collector_mixin/receiver/otlpreceiver/mixin.go b/internal/.otel_collector_mixin/receiver/otlpreceiver/mixin.go index 628a933034a..385d2a7e38b 100644 --- a/internal/.otel_collector_mixin/receiver/otlpreceiver/mixin.go +++ b/internal/.otel_collector_mixin/receiver/otlpreceiver/mixin.go @@ -18,8 +18,6 @@ import ( "context" "net/http" - "go.opentelemetry.io/collector/component/componenterror" - "go.opentelemetry.io/otel/metric" apitrace "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -29,7 +27,9 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/model/otlpgrpc" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace" @@ -62,21 +62,21 @@ type HTTPHandlers struct { // RegisterGRPCTraceReceiver registers the trace receiver with a gRPC server. func RegisterGRPCTraceReceiver(ctx context.Context, consumer consumer.Traces, serverGRPC *grpc.Server) error { receiver := trace.New(config.NewComponentID("otlp"), consumer, settings) - otlpgrpc.RegisterTracesServer(serverGRPC, receiver) + ptraceotlp.RegisterServer(serverGRPC, receiver) return nil } // RegisterGRPCMetricsReceiver registers the metrics receiver with a gRPC server. func RegisterGRPCMetricsReceiver(ctx context.Context, consumer consumer.Metrics, serverGRPC *grpc.Server) error { receiver := metrics.New(config.NewComponentID("otlp"), consumer, settings) - otlpgrpc.RegisterMetricsServer(serverGRPC, receiver) + pmetricotlp.RegisterServer(serverGRPC, receiver) return nil } // RegisterGRPCLogsReceiver registers the logs receiver with a gRPC server. func RegisterGRPCLogsReceiver(ctx context.Context, consumer consumer.Logs, serverGRPC *grpc.Server) error { receiver := logs.New(config.NewComponentID("otlp"), consumer, settings) - otlpgrpc.RegisterLogsServer(serverGRPC, receiver) + plogotlp.RegisterServer(serverGRPC, receiver) return nil } @@ -84,9 +84,6 @@ func RegisterGRPCLogsReceiver(ctx context.Context, consumer consumer.Logs, serve func TracesHTTPHandler(ctx context.Context, consumer consumer.Traces) (http.HandlerFunc, error) { receiver := trace.New(config.NewComponentID("otlp"), consumer, settings) - if consumer == nil { - return nil, componenterror.ErrNilNextConsumer - } return func(w http.ResponseWriter, r *http.Request) { handleTraces(w, r, receiver, pbEncoder) }, nil @@ -94,9 +91,6 @@ func TracesHTTPHandler(ctx context.Context, consumer consumer.Traces) (http.Hand func MetricsHTTPHandler(ctx context.Context, consumer consumer.Metrics) (http.HandlerFunc, error) { receiver := metrics.New(config.NewComponentID("otlp"), consumer, settings) - if consumer == nil { - return nil, componenterror.ErrNilNextConsumer - } return func(w http.ResponseWriter, r *http.Request) { handleMetrics(w, r, receiver, pbEncoder) }, nil @@ -104,9 +98,6 @@ func MetricsHTTPHandler(ctx context.Context, consumer consumer.Metrics) (http.Ha func LogsHTTPHandler(ctx context.Context, consumer consumer.Logs) (http.HandlerFunc, error) { receiver := logs.New(config.NewComponentID("otlp"), consumer, settings) - if consumer == nil { - return nil, componenterror.ErrNilNextConsumer - } return func(w http.ResponseWriter, r *http.Request) { handleLogs(w, r, receiver, pbEncoder) }, nil diff --git a/internal/beater/jaeger/grpc.go b/internal/beater/jaeger/grpc.go index a61eed9ec98..5147e318d03 100644 --- a/internal/beater/jaeger/grpc.go +++ b/internal/beater/jaeger/grpc.go @@ -108,7 +108,10 @@ func (c *grpcCollector) PostSpans(ctx context.Context, r *api_v2.PostSpansReques func (c *grpcCollector) postSpans(ctx context.Context, batch jaegermodel.Batch) error { spanCount := int64(len(batch.Spans)) gRPCCollectorMonitoringMap.add(request.IDEventReceivedCount, spanCount) - traces := jaegertranslator.ProtoBatchToInternalTraces(batch) + traces, err := jaegertranslator.ProtoToTraces([]*jaegermodel.Batch{&batch}) + if err != nil { + return err + } return c.consumer.ConsumeTraces(ctx, traces) } diff --git a/internal/beater/jaeger/grpc_test.go b/internal/beater/jaeger/grpc_test.go index 054f2fa4684..7349f39c19e 100644 --- a/internal/beater/jaeger/grpc_test.go +++ b/internal/beater/jaeger/grpc_test.go @@ -31,7 +31,8 @@ import ( jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/model/pdata" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" @@ -97,17 +98,17 @@ func TestPostSpans(t *testing.T) { } func newPostSpansRequest(t *testing.T) *api_v2.PostSpansRequest { - traces := pdata.NewTraces() + traces := ptrace.NewTraces() resourceSpans := traces.ResourceSpans().AppendEmpty() - spans := resourceSpans.InstrumentationLibrarySpans().AppendEmpty() + spans := resourceSpans.ScopeSpans().AppendEmpty() span0 := spans.Spans().AppendEmpty() - span0.SetTraceID(pdata.NewTraceID([16]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})) - span0.SetSpanID(pdata.NewSpanID([8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF})) + span0.SetTraceID(pcommon.NewTraceID([16]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})) + span0.SetSpanID(pcommon.NewSpanID([8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF})) span1 := spans.Spans().AppendEmpty() - span1.SetTraceID(pdata.NewTraceID([16]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})) - span1.SetSpanID(pdata.NewSpanID([8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF})) + span1.SetTraceID(pcommon.NewTraceID([16]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})) + span1.SetSpanID(pcommon.NewSpanID([8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF})) - batches, err := jaegertranslator.InternalTracesToJaegerProto(traces) + batches, err := jaegertranslator.ProtoFromTraces(traces) require.NoError(t, err) require.Len(t, batches, 1) return &api_v2.PostSpansRequest{Batch: *batches[0]} diff --git a/internal/beater/otlp/grpc_test.go b/internal/beater/otlp/grpc_test.go index 0b755fdec04..cef13e4d339 100644 --- a/internal/beater/otlp/grpc_test.go +++ b/internal/beater/otlp/grpc_test.go @@ -25,8 +25,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/model/otlpgrpc" - "go.opentelemetry.io/collector/model/pdata" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" @@ -47,18 +51,17 @@ func TestConsumeTracesGRPC(t *testing.T) { } conn := newGRPCServer(t, batchProcessor) - client := otlpgrpc.NewTracesClient(conn) + client := ptraceotlp.NewClient(conn) // Send a minimal trace to verify that everything is connected properly. // // We intentionally do not check the published event contents; those are // tested in processor/otel. - traces := pdata.NewTraces() - span := traces.ResourceSpans().AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty() + traces := ptrace.NewTraces() + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() span.SetName("operation_name") - tracesRequest := otlpgrpc.NewTracesRequest() - tracesRequest.SetTraces(traces) + tracesRequest := ptraceotlp.NewRequestFromTraces(traces) _, err := client.Export(context.Background(), tracesRequest) assert.NoError(t, err) require.Len(t, batches, 1) @@ -94,20 +97,19 @@ func TestConsumeMetricsGRPC(t *testing.T) { } conn := newGRPCServer(t, batchProcessor) - client := otlpgrpc.NewMetricsClient(conn) + client := pmetricotlp.NewClient(conn) // Send a minimal metric to verify that everything is connected properly. // // We intentionally do not check the published event contents; those are // tested in processor/otel. - metrics := pdata.NewMetrics() - metric := metrics.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty().Metrics().AppendEmpty() + metrics := pmetric.NewMetrics() + metric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() metric.SetName("metric_type") - metric.SetDataType(pdata.MetricDataTypeSummary) + metric.SetDataType(pmetric.MetricDataTypeSummary) metric.Summary().DataPoints().AppendEmpty() - metricsRequest := otlpgrpc.NewMetricsRequest() - metricsRequest.SetMetrics(metrics) + metricsRequest := pmetricotlp.NewRequestFromMetrics(metrics) _, err := client.Export(context.Background(), metricsRequest) assert.NoError(t, err) @@ -144,18 +146,16 @@ func TestConsumeLogsGRPC(t *testing.T) { } conn := newGRPCServer(t, batchProcessor) - client := otlpgrpc.NewLogsClient(conn) + client := plogotlp.NewClient(conn) // Send a minimal log record to verify that everything is connected properly. // // We intentionally do not check the published event contents; those are // tested in processor/otel. - logs := pdata.NewLogs() - logRecord := logs.ResourceLogs().AppendEmpty().InstrumentationLibraryLogs().AppendEmpty().LogRecords().AppendEmpty() - logRecord.SetName("log_name") + logs := plog.NewLogs() + logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - logsRequest := otlpgrpc.NewLogsRequest() - logsRequest.SetLogs(logs) + logsRequest := plogotlp.NewRequestFromLogs(logs) _, err := client.Export(context.Background(), logsRequest) assert.NoError(t, err) require.Len(t, batches, 1) diff --git a/internal/beater/otlp/http_test.go b/internal/beater/otlp/http_test.go index 06345b881e4..bee07461ef6 100644 --- a/internal/beater/otlp/http_test.go +++ b/internal/beater/otlp/http_test.go @@ -27,8 +27,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/model/otlpgrpc" - "go.opentelemetry.io/collector/model/pdata" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "github.com/elastic/apm-server/internal/agentcfg" "github.com/elastic/apm-server/internal/beater/api" @@ -54,13 +58,12 @@ func TestConsumeTracesHTTP(t *testing.T) { // // We intentionally do not check the published event contents; those are // tested in processor/otel. - traces := pdata.NewTraces() - span := traces.ResourceSpans().AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty() + traces := ptrace.NewTraces() + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() span.SetName("operation_name") - tracesRequest := otlpgrpc.NewTracesRequest() - tracesRequest.SetTraces(traces) - request, err := tracesRequest.Marshal() + tracesRequest := ptraceotlp.NewRequestFromTraces(traces) + request, err := tracesRequest.MarshalProto() assert.NoError(t, err) req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s/v1/traces", addr), bytes.NewReader(request)) assert.NoError(t, err) @@ -98,15 +101,14 @@ func TestConsumeMetricsHTTP(t *testing.T) { // // We intentionally do not check the published event contents; those are // tested in processor/otel. - metrics := pdata.NewMetrics() - metric := metrics.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty().Metrics().AppendEmpty() + metrics := pmetric.NewMetrics() + metric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() metric.SetName("metric_type") - metric.SetDataType(pdata.MetricDataTypeSummary) + metric.SetDataType(pmetric.MetricDataTypeSummary) metric.Summary().DataPoints().AppendEmpty() - metricsRequest := otlpgrpc.NewMetricsRequest() - metricsRequest.SetMetrics(metrics) - request, err := metricsRequest.Marshal() + metricsRequest := pmetricotlp.NewRequestFromMetrics(metrics) + request, err := metricsRequest.MarshalProto() assert.NoError(t, err) req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s/v1/metrics", addr), bytes.NewReader(request)) assert.NoError(t, err) @@ -146,13 +148,11 @@ func TestConsumeLogsHTTP(t *testing.T) { // // We intentionally do not check the published event contents; those are // tested in processor/otel. - logs := pdata.NewLogs() - logRecord := logs.ResourceLogs().AppendEmpty().InstrumentationLibraryLogs().AppendEmpty().LogRecords().AppendEmpty() - logRecord.SetName("log_name") + logs := plog.NewLogs() + logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - logsRequest := otlpgrpc.NewLogsRequest() - logsRequest.SetLogs(logs) - request, err := logsRequest.Marshal() + logsRequest := plogotlp.NewRequestFromLogs(logs) + request, err := logsRequest.MarshalProto() assert.NoError(t, err) req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s/v1/logs", addr), bytes.NewReader(request)) assert.NoError(t, err) diff --git a/internal/model/modeldecoder/v2/decoder.go b/internal/model/modeldecoder/v2/decoder.go index 177e1a7a81b..be85177a7ad 100644 --- a/internal/model/modeldecoder/v2/decoder.go +++ b/internal/model/modeldecoder/v2/decoder.go @@ -37,7 +37,8 @@ import ( "github.com/elastic/apm-server/internal/netutil" otel_processor "github.com/elastic/apm-server/internal/processor/otel" - "go.opentelemetry.io/collector/model/pdata" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" ) var ( @@ -1284,7 +1285,7 @@ func mapToTransactionModel(from *transaction, event *model.APMEvent) { } func mapOTelAttributesTransaction(from otel, out *model.APMEvent) { - library := pdata.NewInstrumentationLibrary() + scope := pcommon.NewInstrumentationScope() m := otelAttributeMap(&from) if from.SpanKind.IsSet() { out.Span.Kind = from.SpanKind.Val @@ -1297,9 +1298,9 @@ func mapOTelAttributesTransaction(from otel, out *model.APMEvent) { } // TODO: Does this work? Is there a way we can infer the status code, // potentially in the actual attributes map? - spanStatus := pdata.NewSpanStatus() - spanStatus.SetCode(pdata.StatusCodeUnset) - otel_processor.TranslateTransaction(m, spanStatus, library, out) + spanStatus := ptrace.NewSpanStatus() + spanStatus.SetCode(ptrace.StatusCodeUnset) + otel_processor.TranslateTransaction(m, spanStatus, scope, out) if out.Span.Kind == "" { switch out.Transaction.Type { @@ -1323,27 +1324,27 @@ func mapOTelAttributesSpan(from otel, out *model.APMEvent) { if out.NumericLabels == nil { out.NumericLabels = make(model.NumericLabels) } - var spanKind pdata.SpanKind + var spanKind ptrace.SpanKind if from.SpanKind.IsSet() { switch from.SpanKind.Val { - case pdata.SpanKindInternal.String()[len(spanKindStringPrefix):]: - spanKind = pdata.SpanKindInternal - case pdata.SpanKindServer.String()[len(spanKindStringPrefix):]: - spanKind = pdata.SpanKindServer - case pdata.SpanKindClient.String()[len(spanKindStringPrefix):]: - spanKind = pdata.SpanKindClient - case pdata.SpanKindProducer.String()[len(spanKindStringPrefix):]: - spanKind = pdata.SpanKindProducer - case pdata.SpanKindConsumer.String()[len(spanKindStringPrefix):]: - spanKind = pdata.SpanKindConsumer + case ptrace.SpanKindInternal.String()[len(spanKindStringPrefix):]: + spanKind = ptrace.SpanKindInternal + case ptrace.SpanKindServer.String()[len(spanKindStringPrefix):]: + spanKind = ptrace.SpanKindServer + case ptrace.SpanKindClient.String()[len(spanKindStringPrefix):]: + spanKind = ptrace.SpanKindClient + case ptrace.SpanKindProducer.String()[len(spanKindStringPrefix):]: + spanKind = ptrace.SpanKindProducer + case ptrace.SpanKindConsumer.String()[len(spanKindStringPrefix):]: + spanKind = ptrace.SpanKindConsumer default: - spanKind = pdata.SpanKindUnspecified + spanKind = ptrace.SpanKindUnspecified } out.Span.Kind = from.SpanKind.Val } otel_processor.TranslateSpan(spanKind, m, out) - if spanKind == pdata.SpanKindUnspecified { + if spanKind == ptrace.SpanKindUnspecified { switch out.Span.Type { case "db", "external", "storage": out.Span.Kind = "CLIENT" @@ -1384,8 +1385,8 @@ func overwriteUserInMetadataModel(from user, out *model.APMEvent) { } } -func otelAttributeMap(o *otel) pdata.AttributeMap { - m := pdata.NewAttributeMap() +func otelAttributeMap(o *otel) pcommon.Map { + m := pcommon.NewMap() for k, v := range o.Attributes { if attr, ok := otelAttributeValue(k, v); ok { m.Insert(k, attr) @@ -1394,29 +1395,29 @@ func otelAttributeMap(o *otel) pdata.AttributeMap { return m } -func otelAttributeValue(k string, v interface{}) (pdata.AttributeValue, bool) { +func otelAttributeValue(k string, v interface{}) (pcommon.Value, bool) { // According to the spec, these are the allowed primitive types // Additionally, homogeneous arrays (single type) of primitive types are allowed // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/common.md#attributes switch v := v.(type) { case string: - return pdata.NewAttributeValueString(v), true + return pcommon.NewValueString(v), true case bool: - return pdata.NewAttributeValueBool(v), true + return pcommon.NewValueBool(v), true case json.Number: // Semantic conventions have specified types, and we rely on this // in processor/otel when mapping to our data model. For example, // `http.status_code` is expected to be an int. if !isOTelDoubleAttribute(k) { if v, err := v.Int64(); err == nil { - return pdata.NewAttributeValueInt(v), true + return pcommon.NewValueInt(v), true } } if v, err := v.Float64(); err == nil { - return pdata.NewAttributeValueDouble(v), true + return pcommon.NewValueDouble(v), true } case []interface{}: - array := pdata.NewAttributeValueArray() + array := pcommon.NewValueSlice() array.SliceVal().EnsureCapacity(len(v)) for i := range v { if elem, ok := otelAttributeValue(k, v[i]); ok { @@ -1425,7 +1426,7 @@ func otelAttributeValue(k string, v interface{}) (pdata.AttributeValue, bool) { } return array, true } - return pdata.AttributeValue{}, false + return pcommon.Value{}, false } // isOTelDoubleAttribute indicates whether k is an OpenTelemetry semantic convention attribute diff --git a/internal/processor/otel/exceptions_test.go b/internal/processor/otel/exceptions_test.go index 3db3b4d6788..8dda3aab9ce 100644 --- a/internal/processor/otel/exceptions_test.go +++ b/internal/processor/otel/exceptions_test.go @@ -40,17 +40,18 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/model/pdata" - semconv "go.opentelemetry.io/collector/model/semconv/v1.5.0" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + semconv "go.opentelemetry.io/collector/semconv/v1.5.0" "github.com/elastic/apm-server/internal/model" ) func TestEncodeSpanEventsNonExceptions(t *testing.T) { - nonExceptionEvent := pdata.NewSpanEvent() + nonExceptionEvent := ptrace.NewSpanEvent() nonExceptionEvent.SetName("not_exception") - incompleteExceptionEvent := pdata.NewSpanEvent() + incompleteExceptionEvent := ptrace.NewSpanEvent() incompleteExceptionEvent.SetName("exception") incompleteExceptionEvent.Attributes().InsertString( // At least one of exception.message and exception.type is required. @@ -66,8 +67,8 @@ func TestEncodeSpanEventsNonExceptions(t *testing.T) { func TestEncodeSpanEventsJavaExceptions(t *testing.T) { timestamp := time.Unix(123, 0).UTC() - exceptionEvent1 := pdata.NewSpanEvent() - exceptionEvent1.SetTimestamp(pdata.NewTimestampFromTime(timestamp)) + exceptionEvent1 := ptrace.NewSpanEvent() + exceptionEvent1.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) exceptionEvent1.SetName("exception") exceptionEvent1.Attributes().InsertString("exception.type", "java.net.ConnectException.OSError") exceptionEvent1.Attributes().InsertString("exception.message", "Division by zero") @@ -82,8 +83,8 @@ Exception in thread "main" java.lang.RuntimeException: Test exception at java.base/java.lang.Thread.run(Unknown Source) `[1:]) - exceptionEvent2 := pdata.NewSpanEvent() - exceptionEvent2.SetTimestamp(pdata.NewTimestampFromTime(timestamp)) + exceptionEvent2 := ptrace.NewSpanEvent() + exceptionEvent2.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) exceptionEvent2.SetName("exception") exceptionEvent2.Attributes().InsertString("exception.type", "HighLevelException") exceptionEvent2.Attributes().InsertString("exception.message", "MidLevelException: LowLevelException") @@ -282,9 +283,9 @@ Caused by: whatever at the movies`, } - var events []pdata.SpanEvent + var events []ptrace.SpanEvent for _, stacktrace := range stacktraces { - event := pdata.NewSpanEvent() + event := ptrace.NewSpanEvent() event.SetName("exception") event.Attributes().InsertString("exception.type", "ExceptionType") event.Attributes().InsertString("exception.stacktrace", stacktrace) @@ -303,8 +304,8 @@ Caused by: whatever func TestEncodeSpanEventsNonJavaExceptions(t *testing.T) { timestamp := time.Unix(123, 0).UTC() - exceptionEvent := pdata.NewSpanEvent() - exceptionEvent.SetTimestamp(pdata.NewTimestampFromTime(timestamp)) + exceptionEvent := ptrace.NewSpanEvent() + exceptionEvent.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) exceptionEvent.SetName("exception") exceptionEvent.Attributes().InsertString("exception.type", "the_type") exceptionEvent.Attributes().InsertString("exception.message", "the_message") diff --git a/internal/processor/otel/logs.go b/internal/processor/otel/logs.go index 32399f9c6a4..b3a39bfc436 100644 --- a/internal/processor/otel/logs.go +++ b/internal/processor/otel/logs.go @@ -38,17 +38,17 @@ import ( "context" "time" - "go.opentelemetry.io/collector/model/otlp" - "go.opentelemetry.io/collector/model/pdata" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" apmserverlogs "github.com/elastic/apm-server/internal/logs" "github.com/elastic/apm-server/internal/model" "github.com/elastic/elastic-agent-libs/logp" ) -var jsonLogsMarshaler = otlp.NewJSONLogsMarshaler() +var jsonLogsMarshaler = plog.NewJSONMarshaler() -func (c *Consumer) ConsumeLogs(ctx context.Context, logs pdata.Logs) error { +func (c *Consumer) ConsumeLogs(ctx context.Context, logs plog.Logs) error { receiveTimestamp := time.Now() logger := logp.NewLogger(apmserverlogs.Otel) if logger.IsDebug() { @@ -67,7 +67,7 @@ func (c *Consumer) ConsumeLogs(ctx context.Context, logs pdata.Logs) error { return c.Processor.ProcessBatch(ctx, &batch) } -func (c *Consumer) convertResourceLogs(resourceLogs pdata.ResourceLogs, receiveTimestamp time.Time, out *model.Batch) { +func (c *Consumer) convertResourceLogs(resourceLogs plog.ResourceLogs, receiveTimestamp time.Time, out *model.Batch) { var timeDelta time.Duration resource := resourceLogs.Resource() baseEvent := model.APMEvent{Processor: model.LogProcessor} @@ -76,14 +76,14 @@ func (c *Consumer) convertResourceLogs(resourceLogs pdata.ResourceLogs, receiveT if exportTimestamp, ok := exportTimestamp(resource); ok { timeDelta = receiveTimestamp.Sub(exportTimestamp) } - instrumentationLibraryLogs := resourceLogs.InstrumentationLibraryLogs() - for i := 0; i < instrumentationLibraryLogs.Len(); i++ { - c.convertInstrumentationLibraryLogs(instrumentationLibraryLogs.At(i), baseEvent, timeDelta, out) + scopeLogs := resourceLogs.ScopeLogs() + for i := 0; i < scopeLogs.Len(); i++ { + c.convertInstrumentationLibraryLogs(scopeLogs.At(i), baseEvent, timeDelta, out) } } func (c *Consumer) convertInstrumentationLibraryLogs( - in pdata.InstrumentationLibraryLogs, + in plog.ScopeLogs, baseEvent model.APMEvent, timeDelta time.Duration, out *model.Batch, @@ -96,7 +96,7 @@ func (c *Consumer) convertInstrumentationLibraryLogs( } func (c *Consumer) convertLogRecord( - record pdata.LogRecord, + record plog.LogRecord, baseEvent model.APMEvent, timeDelta time.Duration, ) model.APMEvent { @@ -104,11 +104,10 @@ func (c *Consumer) convertLogRecord( initEventLabels(&event) event.Timestamp = record.Timestamp().AsTime().Add(timeDelta) event.Event.Severity = int64(record.SeverityNumber()) - event.Event.Action = record.Name() event.Log.Level = record.SeverityText() - if body := record.Body(); body.Type() != pdata.AttributeValueTypeEmpty { + if body := record.Body(); body.Type() != pcommon.ValueTypeEmpty { event.Message = body.AsString() - if body.Type() == pdata.AttributeValueTypeMap { + if body.Type() == pcommon.ValueTypeMap { setLabels(body.MapVal(), &event) } } @@ -127,8 +126,8 @@ func (c *Consumer) convertLogRecord( return event } -func setLabels(m pdata.AttributeMap, event *model.APMEvent) { - m.Range(func(k string, v pdata.AttributeValue) bool { +func setLabels(m pcommon.Map, event *model.APMEvent) { + m.Range(func(k string, v pcommon.Value) bool { setLabel(k, event, ifaceAttributeValue(v)) return true }) diff --git a/internal/processor/otel/logs_test.go b/internal/processor/otel/logs_test.go index ab4db6d18dd..dce655feddf 100644 --- a/internal/processor/otel/logs_test.go +++ b/internal/processor/otel/logs_test.go @@ -40,8 +40,9 @@ import ( "time" "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/model/pdata" - semconv "go.opentelemetry.io/collector/model/semconv/v1.5.0" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + semconv "go.opentelemetry.io/collector/semconv/v1.5.0" "github.com/elastic/apm-server/internal/model" "github.com/elastic/apm-server/internal/processor/otel" @@ -55,7 +56,7 @@ func TestConsumerConsumeLogs(t *testing.T) { } consumer := otel.Consumer{Processor: processor} - logs := pdata.NewLogs() + logs := plog.NewLogs() assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs)) }) @@ -71,8 +72,7 @@ func TestConsumerConsumeLogs(t *testing.T) { }, Message: "a random log message", Event: model.Event{ - Severity: int64(pdata.SeverityNumberINFO), - Action: "doOperation()", + Severity: int64(plog.SeverityNumberINFO), }, Log: model.Log{Level: "Info"}, Span: &model.Span{ID: "0200000000000000"}, @@ -82,11 +82,11 @@ func TestConsumerConsumeLogs(t *testing.T) { } test := func(name string, body interface{}, expectedMessage string) { t.Run(name, func(t *testing.T) { - logs := pdata.NewLogs() + logs := plog.NewLogs() resourceLogs := logs.ResourceLogs().AppendEmpty() logs.ResourceLogs().At(0).Resource().Attributes().InsertString(semconv.AttributeTelemetrySDKLanguage, "go") - instrumentationLogs := resourceLogs.InstrumentationLibraryLogs().AppendEmpty() - newLogRecord(body).CopyTo(instrumentationLogs.LogRecords().AppendEmpty()) + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + newLogRecord(body).CopyTo(scopeLogs.LogRecords().AppendEmpty()) var processed model.Batch var processor model.ProcessBatchFunc = func(_ context.Context, batch *model.Batch) error { @@ -114,25 +114,25 @@ func TestConsumerConsumeLogs(t *testing.T) { } func TestConsumerConsumeLogsLabels(t *testing.T) { - logs := pdata.NewLogs() + logs := plog.NewLogs() resourceLogs := logs.ResourceLogs().AppendEmpty() resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes() resourceAttrs.InsertString(semconv.AttributeTelemetrySDKLanguage, "go") resourceAttrs.InsertString("key0", "zero") - instrumentationLogs := resourceLogs.InstrumentationLibraryLogs().AppendEmpty() + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() record1 := newLogRecord("whatever") record1.Attributes().InsertString("key1", "one") - record1.CopyTo(instrumentationLogs.LogRecords().AppendEmpty()) + record1.CopyTo(scopeLogs.LogRecords().AppendEmpty()) record2 := newLogRecord("andever") record2.Attributes().InsertDouble("key2", 2) - record2.CopyTo(instrumentationLogs.LogRecords().AppendEmpty()) + record2.CopyTo(scopeLogs.LogRecords().AppendEmpty()) record3 := newLogRecord("amen") record3.Attributes().InsertString("key3", "three") record3.Attributes().InsertInt("key4", 4) - record3.CopyTo(instrumentationLogs.LogRecords().AppendEmpty()) + record3.CopyTo(scopeLogs.LogRecords().AppendEmpty()) var processed model.Batch var processor model.ProcessBatchFunc = func(_ context.Context, batch *model.Batch) error { @@ -156,14 +156,13 @@ func TestConsumerConsumeLogsLabels(t *testing.T) { assert.Equal(t, model.NumericLabels{"key4": {Value: 4}}, processed[2].NumericLabels) } -func newLogRecord(body interface{}) pdata.LogRecord { - otelLogRecord := pdata.NewLogRecord() - otelLogRecord.SetTraceID(pdata.NewTraceID([16]byte{1})) - otelLogRecord.SetSpanID(pdata.NewSpanID([8]byte{2})) - otelLogRecord.SetName("doOperation()") - otelLogRecord.SetSeverityNumber(pdata.SeverityNumberINFO) +func newLogRecord(body interface{}) plog.LogRecord { + otelLogRecord := plog.NewLogRecord() + otelLogRecord.SetTraceID(pcommon.NewTraceID([16]byte{1})) + otelLogRecord.SetSpanID(pcommon.NewSpanID([8]byte{2})) + otelLogRecord.SetSeverityNumber(plog.SeverityNumberINFO) otelLogRecord.SetSeverityText("Info") - otelLogRecord.SetTimestamp(pdata.NewTimestampFromTime(time.Now())) + otelLogRecord.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) switch b := body.(type) { case string: diff --git a/internal/processor/otel/metadata.go b/internal/processor/otel/metadata.go index a76ecb654f0..143e6a76e31 100644 --- a/internal/processor/otel/metadata.go +++ b/internal/processor/otel/metadata.go @@ -24,8 +24,8 @@ import ( "strconv" "strings" - "go.opentelemetry.io/collector/model/pdata" - semconv "go.opentelemetry.io/collector/model/semconv/v1.5.0" + "go.opentelemetry.io/collector/pdata/pcommon" + semconv "go.opentelemetry.io/collector/semconv/v1.5.0" "github.com/elastic/apm-server/internal/model" ) @@ -38,9 +38,9 @@ var ( serviceNameInvalidRegexp = regexp.MustCompile("[^a-zA-Z0-9 _-]") ) -func translateResourceMetadata(resource pdata.Resource, out *model.APMEvent) { +func translateResourceMetadata(resource pcommon.Resource, out *model.APMEvent) { var exporterVersion string - resource.Attributes().Range(func(k string, v pdata.AttributeValue) bool { + resource.Attributes().Range(func(k string, v pcommon.Value) bool { switch k { // service.* case semconv.AttributeServiceName: @@ -201,23 +201,23 @@ func cleanServiceName(name string) string { return serviceNameInvalidRegexp.ReplaceAllString(truncate(name), "_") } -func ifaceAttributeValue(v pdata.AttributeValue) interface{} { +func ifaceAttributeValue(v pcommon.Value) interface{} { switch v.Type() { - case pdata.AttributeValueTypeString: + case pcommon.ValueTypeString: return truncate(v.StringVal()) - case pdata.AttributeValueTypeBool: + case pcommon.ValueTypeBool: return strconv.FormatBool(v.BoolVal()) - case pdata.AttributeValueTypeInt: + case pcommon.ValueTypeInt: return float64(v.IntVal()) - case pdata.AttributeValueTypeDouble: + case pcommon.ValueTypeDouble: return v.DoubleVal() - case pdata.AttributeValueTypeArray: + case pcommon.ValueTypeSlice: return ifaceAttributeValueSlice(v.SliceVal()) } return nil } -func ifaceAttributeValueSlice(slice pdata.AttributeValueSlice) []interface{} { +func ifaceAttributeValueSlice(slice pcommon.Slice) []interface{} { values := make([]interface{}, slice.Len()) for i := range values { values[i] = ifaceAttributeValue(slice.At(i)) diff --git a/internal/processor/otel/metadata_test.go b/internal/processor/otel/metadata_test.go index 977f5f01f10..8503ab43f07 100644 --- a/internal/processor/otel/metadata_test.go +++ b/internal/processor/otel/metadata_test.go @@ -22,7 +22,7 @@ import ( "time" "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/model/pdata" + "go.opentelemetry.io/collector/pdata/pcommon" "github.com/elastic/apm-server/internal/model" ) @@ -35,7 +35,7 @@ func TestResourceConventions(t *testing.T) { } for name, test := range map[string]struct { - attrs map[string]pdata.AttributeValue + attrs map[string]interface{} expected model.APMEvent }{ "empty": { @@ -43,10 +43,10 @@ func TestResourceConventions(t *testing.T) { expected: model.APMEvent{Agent: defaultAgent, Service: defaultService}, }, "service": { - attrs: map[string]pdata.AttributeValue{ - "service.name": pdata.NewAttributeValueString("service_name"), - "service.version": pdata.NewAttributeValueString("service_version"), - "deployment.environment": pdata.NewAttributeValueString("service_environment"), + attrs: map[string]interface{}{ + "service.name": "service_name", + "service.version": "service_version", + "deployment.environment": "service_environment", }, expected: model.APMEvent{ Agent: model.Agent{Name: "otlp", Version: "unknown"}, @@ -59,10 +59,10 @@ func TestResourceConventions(t *testing.T) { }, }, "agent": { - attrs: map[string]pdata.AttributeValue{ - "telemetry.sdk.name": pdata.NewAttributeValueString("sdk_name"), - "telemetry.sdk.version": pdata.NewAttributeValueString("sdk_version"), - "telemetry.sdk.language": pdata.NewAttributeValueString("language_name"), + attrs: map[string]interface{}{ + "telemetry.sdk.name": "sdk_name", + "telemetry.sdk.version": "sdk_version", + "telemetry.sdk.language": "language_name", }, expected: model.APMEvent{ Agent: model.Agent{Name: "sdk_name/language_name", Version: "sdk_version"}, @@ -73,9 +73,9 @@ func TestResourceConventions(t *testing.T) { }, }, "runtime": { - attrs: map[string]pdata.AttributeValue{ - "process.runtime.name": pdata.NewAttributeValueString("runtime_name"), - "process.runtime.version": pdata.NewAttributeValueString("runtime_version"), + attrs: map[string]interface{}{ + "process.runtime.name": "runtime_name", + "process.runtime.version": "runtime_version", }, expected: model.APMEvent{ Agent: model.Agent{Name: "otlp", Version: "unknown"}, @@ -90,12 +90,12 @@ func TestResourceConventions(t *testing.T) { }, }, "cloud": { - attrs: map[string]pdata.AttributeValue{ - "cloud.provider": pdata.NewAttributeValueString("provider_name"), - "cloud.region": pdata.NewAttributeValueString("region_name"), - "cloud.account.id": pdata.NewAttributeValueString("account_id"), - "cloud.availability_zone": pdata.NewAttributeValueString("availability_zone"), - "cloud.platform": pdata.NewAttributeValueString("platform_name"), + attrs: map[string]interface{}{ + "cloud.provider": "provider_name", + "cloud.region": "region_name", + "cloud.account.id": "account_id", + "cloud.availability_zone": "availability_zone", + "cloud.platform": "platform_name", }, expected: model.APMEvent{ Agent: defaultAgent, @@ -110,12 +110,12 @@ func TestResourceConventions(t *testing.T) { }, }, "container": { - attrs: map[string]pdata.AttributeValue{ - "container.name": pdata.NewAttributeValueString("container_name"), - "container.id": pdata.NewAttributeValueString("container_id"), - "container.image.name": pdata.NewAttributeValueString("container_image_name"), - "container.image.tag": pdata.NewAttributeValueString("container_image_tag"), - "container.runtime": pdata.NewAttributeValueString("container_runtime"), + attrs: map[string]interface{}{ + "container.name": "container_name", + "container.id": "container_id", + "container.image.name": "container_image_name", + "container.image.tag": "container_image_tag", + "container.runtime": "container_runtime", }, expected: model.APMEvent{ Agent: defaultAgent, @@ -130,11 +130,11 @@ func TestResourceConventions(t *testing.T) { }, }, "kubernetes": { - attrs: map[string]pdata.AttributeValue{ - "k8s.namespace.name": pdata.NewAttributeValueString("kubernetes_namespace"), - "k8s.node.name": pdata.NewAttributeValueString("kubernetes_node_name"), - "k8s.pod.name": pdata.NewAttributeValueString("kubernetes_pod_name"), - "k8s.pod.uid": pdata.NewAttributeValueString("kubernetes_pod_uid"), + attrs: map[string]interface{}{ + "k8s.namespace.name": "kubernetes_namespace", + "k8s.node.name": "kubernetes_node_name", + "k8s.pod.name": "kubernetes_pod_name", + "k8s.pod.uid": "kubernetes_pod_uid", }, expected: model.APMEvent{ Agent: defaultAgent, @@ -148,11 +148,11 @@ func TestResourceConventions(t *testing.T) { }, }, "host": { - attrs: map[string]pdata.AttributeValue{ - "host.name": pdata.NewAttributeValueString("host_name"), - "host.id": pdata.NewAttributeValueString("host_id"), - "host.type": pdata.NewAttributeValueString("host_type"), - "host.arch": pdata.NewAttributeValueString("host_arch"), + attrs: map[string]interface{}{ + "host.name": "host_name", + "host.id": "host_id", + "host.type": "host_type", + "host.arch": "host_arch", }, expected: model.APMEvent{ Agent: defaultAgent, @@ -166,10 +166,10 @@ func TestResourceConventions(t *testing.T) { }, }, "process": { - attrs: map[string]pdata.AttributeValue{ - "process.pid": pdata.NewAttributeValueInt(123), - "process.command_line": pdata.NewAttributeValueString("command_line"), - "process.executable.path": pdata.NewAttributeValueString("executable_path"), + attrs: map[string]interface{}{ + "process.pid": 123, + "process.command_line": "command_line", + "process.executable.path": "executable_path", }, expected: model.APMEvent{ Agent: defaultAgent, @@ -182,9 +182,9 @@ func TestResourceConventions(t *testing.T) { }, }, "os": { - attrs: map[string]pdata.AttributeValue{ - "os.type": pdata.NewAttributeValueString("DARWIN"), - "os.description": pdata.NewAttributeValueString("Mac OS Mojave"), + attrs: map[string]interface{}{ + "os.type": "DARWIN", + "os.description": "Mac OS Mojave", }, expected: model.APMEvent{ Agent: defaultAgent, @@ -207,17 +207,9 @@ func TestResourceConventions(t *testing.T) { } func TestResourceLabels(t *testing.T) { - stringArray := pdata.NewAttributeValueArray() - stringArray.SliceVal().AppendEmpty().SetStringVal("abc") - stringArray.SliceVal().AppendEmpty().SetStringVal("def") - - intArray := pdata.NewAttributeValueArray() - intArray.SliceVal().AppendEmpty().SetIntVal(123) - intArray.SliceVal().AppendEmpty().SetIntVal(456) - - metadata := transformResourceMetadata(t, map[string]pdata.AttributeValue{ - "string_array": stringArray, - "int_array": intArray, + metadata := transformResourceMetadata(t, map[string]interface{}{ + "string_array": []interface{}{"abc", "def"}, + "int_array": []interface{}{123, 456}, }) assert.Equal(t, model.Labels{ "string_array": {Values: []string{"abc", "def"}}, @@ -227,12 +219,12 @@ func TestResourceLabels(t *testing.T) { }, metadata.NumericLabels) } -func transformResourceMetadata(t *testing.T, resourceAttrs map[string]pdata.AttributeValue) model.APMEvent { +func transformResourceMetadata(t *testing.T, resourceAttrs map[string]interface{}) model.APMEvent { traces, spans := newTracesSpans() - pdata.NewAttributeMapFromMap(resourceAttrs).CopyTo(traces.ResourceSpans().At(0).Resource().Attributes()) + pcommon.NewMapFromRaw(resourceAttrs).CopyTo(traces.ResourceSpans().At(0).Resource().Attributes()) otelSpan := spans.Spans().AppendEmpty() - otelSpan.SetTraceID(pdata.NewTraceID([16]byte{1})) - otelSpan.SetSpanID(pdata.NewSpanID([8]byte{2})) + otelSpan.SetTraceID(pcommon.NewTraceID([16]byte{1})) + otelSpan.SetSpanID(pcommon.NewSpanID([8]byte{2})) events := transformTraces(t, traces) events[0].Transaction = nil events[0].Trace = model.Trace{} diff --git a/internal/processor/otel/metrics.go b/internal/processor/otel/metrics.go index 37b75861529..5e752edc4c6 100644 --- a/internal/processor/otel/metrics.go +++ b/internal/processor/otel/metrics.go @@ -43,7 +43,8 @@ import ( "sync/atomic" "time" - "go.opentelemetry.io/collector/model/pdata" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "github.com/elastic/apm-server/internal/logs" "github.com/elastic/apm-server/internal/model" @@ -52,7 +53,7 @@ import ( // ConsumeMetrics consumes OpenTelemetry metrics data, converting into // the Elastic APM metrics model and sending to the reporter. -func (c *Consumer) ConsumeMetrics(ctx context.Context, metrics pdata.Metrics) error { +func (c *Consumer) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error { receiveTimestamp := time.Now() logger := logp.NewLogger(logs.Otel) if logger.IsDebug() { @@ -67,7 +68,7 @@ func (c *Consumer) ConsumeMetrics(ctx context.Context, metrics pdata.Metrics) er return c.Processor.ProcessBatch(ctx, batch) } -func (c *Consumer) convertMetrics(metrics pdata.Metrics, receiveTimestamp time.Time) *model.Batch { +func (c *Consumer) convertMetrics(metrics pmetric.Metrics, receiveTimestamp time.Time) *model.Batch { batch := model.Batch{} resourceMetrics := metrics.ResourceMetrics() for i := 0; i < resourceMetrics.Len(); i++ { @@ -76,7 +77,7 @@ func (c *Consumer) convertMetrics(metrics pdata.Metrics, receiveTimestamp time.T return &batch } -func (c *Consumer) convertResourceMetrics(resourceMetrics pdata.ResourceMetrics, receiveTimestamp time.Time, out *model.Batch) { +func (c *Consumer) convertResourceMetrics(resourceMetrics pmetric.ResourceMetrics, receiveTimestamp time.Time, out *model.Batch) { var baseEvent model.APMEvent var timeDelta time.Duration resource := resourceMetrics.Resource() @@ -84,9 +85,9 @@ func (c *Consumer) convertResourceMetrics(resourceMetrics pdata.ResourceMetrics, if exportTimestamp, ok := exportTimestamp(resource); ok { timeDelta = receiveTimestamp.Sub(exportTimestamp) } - instrumentationLibraryMetrics := resourceMetrics.InstrumentationLibraryMetrics() - for i := 0; i < instrumentationLibraryMetrics.Len(); i++ { - c.convertInstrumentationLibraryMetrics(instrumentationLibraryMetrics.At(i), baseEvent, timeDelta, out) + scopeMetrics := resourceMetrics.ScopeMetrics() + for i := 0; i < scopeMetrics.Len(); i++ { + c.convertScopeMetrics(scopeMetrics.At(i), baseEvent, timeDelta, out) } } @@ -126,10 +127,10 @@ type jvmMemoryKey struct { } // accumulate processes m, translating to and accumulating equivalent Elastic APM metrics in b. -func (b *apmMetricsBuilder) accumulate(m pdata.Metric) { +func (b *apmMetricsBuilder) accumulate(m pmetric.Metric) { switch m.DataType() { - case pdata.MetricDataTypeSum: + case pmetric.MetricDataTypeSum: dpsCounter := m.Sum().DataPoints() for i := 0; i < dpsCounter.Len(); i++ { dp := dpsCounter.At(i) @@ -155,7 +156,7 @@ func (b *apmMetricsBuilder) accumulate(m pdata.Metric) { } } } - case pdata.MetricDataTypeGauge: + case pmetric.MetricDataTypeGauge: // Gauge metrics accumulation dpsGauge := m.Gauge().DataPoints() for i := 0; i < dpsGauge.Len(); i++ { @@ -177,7 +178,7 @@ func (b *apmMetricsBuilder) accumulate(m pdata.Metric) { } case "runtime.jvm.memory.area": var key jvmMemoryKey - dp.Attributes().Range(func(k string, v pdata.AttributeValue) bool { + dp.Attributes().Range(func(k string, v pcommon.Value) bool { switch k { case "area": key.area = v.AsString() @@ -203,7 +204,7 @@ func (b *apmMetricsBuilder) emit(ms metricsets) { // Direct translation of system.memory.usage (state = free) if b.freeMemoryBytes.value > 0 { ms.upsertOne( - b.freeMemoryBytes.timestamp, "system.memory.actual.free", pdata.NewAttributeMap(), + b.freeMemoryBytes.timestamp, "system.memory.actual.free", pcommon.NewMap(), model.MetricsetSample{Value: b.freeMemoryBytes.value}, ) } @@ -212,7 +213,7 @@ func (b *apmMetricsBuilder) emit(ms metricsets) { totalMemoryBytes := b.freeMemoryBytes.value + b.usedMemoryBytes.value if totalMemoryBytes > 0 { ms.upsertOne( - b.freeMemoryBytes.timestamp, "system.memory.total", pdata.NewAttributeMap(), + b.freeMemoryBytes.timestamp, "system.memory.total", pcommon.NewMap(), model.MetricsetSample{Value: totalMemoryBytes}, ) } @@ -220,15 +221,15 @@ func (b *apmMetricsBuilder) emit(ms metricsets) { // Averaging of non-idle CPU utilization over all CPU cores if b.nonIdleCPUUtilizationSum.value > 0 && b.cpuCount > 0 { ms.upsertOne( - b.nonIdleCPUUtilizationSum.timestamp, "system.cpu.total.norm.pct", pdata.NewAttributeMap(), + b.nonIdleCPUUtilizationSum.timestamp, "system.cpu.total.norm.pct", pcommon.NewMap(), model.MetricsetSample{Value: b.nonIdleCPUUtilizationSum.value / float64(b.cpuCount)}, ) } // jvm.gc.time // Direct translation of runtime.jvm.gc.time or runtime.jvm.gc.collection for k, v := range b.jvmGCTime { - elasticapmAttributes := pdata.NewAttributeMap() - elasticapmAttributes.Insert("name", pdata.NewAttributeValueString(k)) + elasticapmAttributes := pcommon.NewMap() + elasticapmAttributes.Insert("name", pcommon.NewValueString(k)) ms.upsertOne( v.timestamp, "jvm.gc.time", elasticapmAttributes, model.MetricsetSample{Value: v.value}, @@ -237,8 +238,8 @@ func (b *apmMetricsBuilder) emit(ms metricsets) { // jvm.gc.count // Direct translation of runtime.jvm.gc.count for k, v := range b.jvmGCCount { - elasticapmAttributes := pdata.NewAttributeMap() - elasticapmAttributes.Insert("name", pdata.NewAttributeValueString(k)) + elasticapmAttributes := pcommon.NewMap() + elasticapmAttributes.Insert("name", pcommon.NewValueString(k)) ms.upsertOne( v.timestamp, "jvm.gc.count", elasticapmAttributes, model.MetricsetSample{Value: v.value}, @@ -247,10 +248,10 @@ func (b *apmMetricsBuilder) emit(ms metricsets) { // jvm.memory.. // Direct translation of runtime.jvm.memory.area (area = xxx, type = xxx) for k, v := range b.jvmMemory { - elasticapmAttributes := pdata.NewAttributeMap() + elasticapmAttributes := pcommon.NewMap() var elasticapmMetricName string if k.pool != "" { - elasticapmAttributes.Insert("name", pdata.NewAttributeValueString(k.pool)) + elasticapmAttributes.Insert("name", pcommon.NewValueString(k.pool)) elasticapmMetricName = fmt.Sprintf("jvm.memory.%s.pool.%s", k.area, k.jvmType) } else { elasticapmMetricName = fmt.Sprintf("jvm.memory.%s.%s", k.area, k.jvmType) @@ -262,8 +263,8 @@ func (b *apmMetricsBuilder) emit(ms metricsets) { } } -func (c *Consumer) convertInstrumentationLibraryMetrics( - in pdata.InstrumentationLibraryMetrics, +func (c *Consumer) convertScopeMetrics( + in pmetric.ScopeMetrics, baseEvent model.APMEvent, timeDelta time.Duration, out *model.Batch, @@ -286,7 +287,7 @@ func (c *Consumer) convertInstrumentationLibraryMetrics( event.Metricset = &model.Metricset{Samples: ms.samples} if ms.attributes.Len() > 0 { initEventLabels(&event) - ms.attributes.Range(func(k string, v pdata.AttributeValue) bool { + ms.attributes.Range(func(k string, v pcommon.Value) bool { setLabel(k, &event, ifaceAttributeValue(v)) return true }) @@ -304,11 +305,11 @@ func (c *Consumer) convertInstrumentationLibraryMetrics( } } -func (c *Consumer) addMetric(metric pdata.Metric, ms metricsets) bool { +func (c *Consumer) addMetric(metric pmetric.Metric, ms metricsets) bool { // TODO(axw) support units anyDropped := false switch metric.DataType() { - case pdata.MetricDataTypeGauge: + case pmetric.MetricDataTypeGauge: dps := metric.Gauge().DataPoints() for i := 0; i < dps.Len(); i++ { dp := dps.At(i) @@ -319,7 +320,7 @@ func (c *Consumer) addMetric(metric pdata.Metric, ms metricsets) bool { } } return !anyDropped - case pdata.MetricDataTypeSum: + case pmetric.MetricDataTypeSum: dps := metric.Sum().DataPoints() for i := 0; i < dps.Len(); i++ { dp := dps.At(i) @@ -330,7 +331,7 @@ func (c *Consumer) addMetric(metric pdata.Metric, ms metricsets) bool { } } return !anyDropped - case pdata.MetricDataTypeHistogram: + case pmetric.MetricDataTypeHistogram: dps := metric.Histogram().DataPoints() for i := 0; i < dps.Len(); i++ { dp := dps.At(i) @@ -340,7 +341,7 @@ func (c *Consumer) addMetric(metric pdata.Metric, ms metricsets) bool { anyDropped = true } } - case pdata.MetricDataTypeSummary: + case pmetric.MetricDataTypeSummary: dps := metric.Summary().DataPoints() for i := 0; i < dps.Len(); i++ { dp := dps.At(i) @@ -354,12 +355,12 @@ func (c *Consumer) addMetric(metric pdata.Metric, ms metricsets) bool { return !anyDropped } -func numberSample(dp pdata.NumberDataPoint, metricType model.MetricType) (model.MetricsetSample, bool) { +func numberSample(dp pmetric.NumberDataPoint, metricType model.MetricType) (model.MetricsetSample, bool) { var value float64 - switch dp.Type() { - case pdata.MetricValueTypeInt: + switch dp.ValueType() { + case pmetric.NumberDataPointValueTypeInt: value = float64(dp.IntVal()) - case pdata.MetricValueTypeDouble: + case pmetric.NumberDataPointValueTypeDouble: value = dp.DoubleVal() if math.IsNaN(value) || math.IsInf(value, 0) { return model.MetricsetSample{}, false @@ -373,7 +374,7 @@ func numberSample(dp pdata.NumberDataPoint, metricType model.MetricType) (model. }, true } -func summarySample(dp pdata.SummaryDataPoint) model.MetricsetSample { +func summarySample(dp pmetric.SummaryDataPoint) model.MetricsetSample { return model.MetricsetSample{ Type: model.MetricTypeSummary, SummaryMetric: model.SummaryMetric{ @@ -383,7 +384,7 @@ func summarySample(dp pdata.SummaryDataPoint) model.MetricsetSample { } } -func histogramSample(bucketCounts []uint64, explicitBounds []float64) (model.MetricsetSample, bool) { +func histogramSample(bucketCounts pcommon.ImmutableUInt64Slice, explicitBounds pcommon.ImmutableFloat64Slice) (model.MetricsetSample, bool) { // (From opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto) // // This defines size(explicit_bounds) + 1 (= N) buckets. The boundaries for @@ -395,7 +396,7 @@ func histogramSample(bucketCounts []uint64, explicitBounds []float64) (model.Met // // The values in the explicit_bounds array must be strictly increasing. // - if len(bucketCounts) != len(explicitBounds)+1 || len(explicitBounds) == 0 { + if bucketCounts.Len() != explicitBounds.Len()+1 || explicitBounds.Len() == 0 { return model.MetricsetSample{}, false } @@ -409,9 +410,10 @@ func histogramSample(bucketCounts []uint64, explicitBounds []float64) (model.Met // bucket is assumed to be 0 if the upper bound of that bucket is greater than 0. In that // case, the usual linear interpolation is applied within that bucket. Otherwise, the upper // bound of the lowest bucket is returned for quantiles located in the lowest bucket." - values := make([]float64, 0, len(bucketCounts)) - counts := make([]int64, 0, len(bucketCounts)) - for i, count := range bucketCounts { + values := make([]float64, 0, bucketCounts.Len()) + counts := make([]int64, 0, bucketCounts.Len()) + for i := 0; i < bucketCounts.Len(); i++ { + count := bucketCounts.At(i) if count == 0 { continue } @@ -420,19 +422,19 @@ func histogramSample(bucketCounts []uint64, explicitBounds []float64) (model.Met switch i { // (-infinity, explicit_bounds[i]] case 0: - value = explicitBounds[i] + value = explicitBounds.At(i) if value > 0 { value /= 2 } // (explicit_bounds[i], +infinity) - case len(bucketCounts) - 1: - value = explicitBounds[i-1] + case bucketCounts.Len() - 1: + value = explicitBounds.At(i - 1) // [explicit_bounds[i-1], explicit_bounds[i]) default: // Use the midpoint between the boundaries. - value = explicitBounds[i-1] + (explicitBounds[i]-explicitBounds[i-1])/2.0 + value = explicitBounds.At(i-1) + (explicitBounds.At(i)-explicitBounds.At(i-1))/2.0 } counts = append(counts, int64(count)) @@ -455,22 +457,22 @@ type metricsetKey struct { } type metricset struct { - attributes pdata.AttributeMap + attributes pcommon.Map samples map[string]model.MetricsetSample } // upsert searches for an existing metricset with the given timestamp and labels, // and appends the sample to it. If there is no such existing metricset, a new one // is created. -func (ms metricsets) upsert(timestamp time.Time, name string, attributes pdata.AttributeMap, sample model.MetricsetSample) { +func (ms metricsets) upsert(timestamp time.Time, name string, attributes pcommon.Map, sample model.MetricsetSample) { // We always record metrics as they are given. We also copy some // well-known OpenTelemetry metrics to their Elastic APM equivalents. ms.upsertOne(timestamp, name, attributes, sample) } -func (ms metricsets) upsertOne(timestamp time.Time, name string, attributes pdata.AttributeMap, sample model.MetricsetSample) { +func (ms metricsets) upsertOne(timestamp time.Time, name string, attributes pcommon.Map, sample model.MetricsetSample) { var signatureBuilder strings.Builder - attributes.Range(func(k string, v pdata.AttributeValue) bool { + attributes.Range(func(k string, v pcommon.Value) bool { signatureBuilder.WriteString(k) signatureBuilder.WriteString(v.AsString()) return true diff --git a/internal/processor/otel/metrics_test.go b/internal/processor/otel/metrics_test.go index c865807d4c3..2862d2b51e4 100644 --- a/internal/processor/otel/metrics_test.go +++ b/internal/processor/otel/metrics_test.go @@ -42,7 +42,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/model/pdata" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "github.com/elastic/apm-server/internal/model" "github.com/elastic/apm-server/internal/processor/otel" @@ -50,11 +51,11 @@ import ( ) func TestConsumeMetrics(t *testing.T) { - metrics := pdata.NewMetrics() + metrics := pmetric.NewMetrics() resourceMetrics := metrics.ResourceMetrics().AppendEmpty() - instrumentationLibraryMetrics := resourceMetrics.InstrumentationLibraryMetrics().AppendEmpty() - metricSlice := instrumentationLibraryMetrics.Metrics() - appendMetric := func(name string, dataType pdata.MetricDataType) pdata.Metric { + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + metricSlice := scopeMetrics.Metrics() + appendMetric := func(name string, dataType pmetric.MetricDataType) pmetric.Metric { metric := metricSlice.AppendEmpty() metric.SetName(name) metric.SetDataType(dataType) @@ -66,69 +67,70 @@ func TestConsumeMetrics(t *testing.T) { var expectDropped int64 - metric := appendMetric("gauge_metric", pdata.MetricDataTypeGauge) + metric := appendMetric("gauge_metric", pmetric.MetricDataTypeGauge) gauge := metric.Gauge() gaugeDP0 := gauge.DataPoints().AppendEmpty() - gaugeDP0.SetTimestamp(pdata.NewTimestampFromTime(timestamp0)) + gaugeDP0.SetTimestamp(pcommon.NewTimestampFromTime(timestamp0)) gaugeDP0.SetIntVal(1) gaugeDP1 := gauge.DataPoints().AppendEmpty() - gaugeDP1.SetTimestamp(pdata.NewTimestampFromTime(timestamp1)) + gaugeDP1.SetTimestamp(pcommon.NewTimestampFromTime(timestamp1)) gaugeDP1.SetDoubleVal(2.3) gaugeDP1.Attributes().InsertString("k", "v") gaugeDP2 := gauge.DataPoints().AppendEmpty() - gaugeDP2.SetTimestamp(pdata.NewTimestampFromTime(timestamp1)) + gaugeDP2.SetTimestamp(pcommon.NewTimestampFromTime(timestamp1)) gaugeDP2.SetIntVal(4) gaugeDP3 := gauge.DataPoints().AppendEmpty() - gaugeDP3.SetTimestamp(pdata.NewTimestampFromTime(timestamp1)) + gaugeDP3.SetTimestamp(pcommon.NewTimestampFromTime(timestamp1)) gaugeDP3.SetDoubleVal(5.6) gaugeDP3.Attributes().InsertString("k", "v2") - metric = appendMetric("sum_metric", pdata.MetricDataTypeSum) + metric = appendMetric("sum_metric", pmetric.MetricDataTypeSum) sum := metric.Sum() sumDP0 := sum.DataPoints().AppendEmpty() - sumDP0.SetTimestamp(pdata.NewTimestampFromTime(timestamp0)) + sumDP0.SetTimestamp(pcommon.NewTimestampFromTime(timestamp0)) sumDP0.SetIntVal(7) sumDP1 := sum.DataPoints().AppendEmpty() - sumDP1.SetTimestamp(pdata.NewTimestampFromTime(timestamp1)) + sumDP1.SetTimestamp(pcommon.NewTimestampFromTime(timestamp1)) sumDP1.SetDoubleVal(8.9) sumDP1.Attributes().InsertString("k", "v") sumDP2 := sum.DataPoints().AppendEmpty() - sumDP2.SetTimestamp(pdata.NewTimestampFromTime(timestamp1)) + sumDP2.SetTimestamp(pcommon.NewTimestampFromTime(timestamp1)) sumDP2.SetIntVal(10) sumDP2.Attributes().InsertString("k2", "v") sumDP3 := sum.DataPoints().AppendEmpty() - sumDP3.SetTimestamp(pdata.NewTimestampFromTime(timestamp1)) + sumDP3.SetTimestamp(pcommon.NewTimestampFromTime(timestamp1)) sumDP3.SetDoubleVal(11.12) sumDP3.Attributes().InsertString("k", "v2") - metric = appendMetric("histogram_metric", pdata.MetricDataTypeHistogram) + metric = appendMetric("histogram_metric", pmetric.MetricDataTypeHistogram) histogram := metric.Histogram() histogramDP := histogram.DataPoints().AppendEmpty() - histogramDP.SetTimestamp(pdata.NewTimestampFromTime(timestamp0)) - histogramDP.SetBucketCounts([]uint64{1, 1, 2, 3}) - histogramDP.SetExplicitBounds([]float64{-1.0, 2.0, 3.5}) + histogramDP.SetTimestamp(pcommon.NewTimestampFromTime(timestamp0)) + histogramDP.SetBucketCounts(pcommon.NewImmutableUInt64Slice([]uint64{1, 1, 2, 3})) + histogramDP.SetExplicitBounds(pcommon.NewImmutableFloat64Slice([]float64{-1.0, 2.0, 3.5})) - metric = appendMetric("summary_metric", pdata.MetricDataTypeSummary) + metric = appendMetric("summary_metric", pmetric.MetricDataTypeSummary) summaryDP := metric.Summary().DataPoints().AppendEmpty() - summaryDP.SetTimestamp(pdata.NewTimestampFromTime(timestamp0)) + summaryDP.SetTimestamp(pcommon.NewTimestampFromTime(timestamp0)) summaryDP.SetCount(10) summaryDP.SetSum(123.456) summaryDP.QuantileValues().AppendEmpty() // quantiles are not stored - metric = appendMetric("invalid_histogram_metric", pdata.MetricDataTypeHistogram) + metric = appendMetric("invalid_histogram_metric", pmetric.MetricDataTypeHistogram) invalidHistogram := metric.Histogram() invalidHistogramDP := invalidHistogram.DataPoints().AppendEmpty() - invalidHistogramDP.SetTimestamp(pdata.NewTimestampFromTime(timestamp0)) - invalidHistogramDP.SetBucketCounts([]uint64{1, 2, 3}) // should be one more bucket count than bounds - invalidHistogramDP.SetExplicitBounds([]float64{1, 2, 3}) + invalidHistogramDP.SetTimestamp(pcommon.NewTimestampFromTime(timestamp0)) + // should be one more bucket count than bounds + invalidHistogramDP.SetBucketCounts(pcommon.NewImmutableUInt64Slice([]uint64{1, 2, 3})) + invalidHistogramDP.SetExplicitBounds(pcommon.NewImmutableFloat64Slice([]float64{1, 2, 3})) expectDropped++ - metric = appendMetric("invalid_histogram_metric2", pdata.MetricDataTypeHistogram) + metric = appendMetric("invalid_histogram_metric2", pmetric.MetricDataTypeHistogram) invalidHistogram = metric.Histogram() invalidHistogramDP = invalidHistogram.DataPoints().AppendEmpty() - invalidHistogramDP.SetTimestamp(pdata.NewTimestampFromTime(timestamp0)) - invalidHistogramDP.SetBucketCounts([]uint64{1}) - invalidHistogramDP.SetExplicitBounds([]float64{}) // should be non-empty + invalidHistogramDP.SetTimestamp(pcommon.NewTimestampFromTime(timestamp0)) + invalidHistogramDP.SetBucketCounts(pcommon.NewImmutableUInt64Slice([]uint64{1})) + invalidHistogramDP.SetExplicitBounds(pcommon.NewImmutableFloat64Slice([]float64{})) // should be non-empty expectDropped++ events, stats := transformMetrics(t, metrics) @@ -211,11 +213,11 @@ func TestConsumeMetrics(t *testing.T) { func TestConsumeMetricsNaN(t *testing.T) { timestamp := time.Unix(123, 0).UTC() - metrics := pdata.NewMetrics() + metrics := pmetric.NewMetrics() resourceMetrics := metrics.ResourceMetrics().AppendEmpty() - instrumentationLibraryMetrics := resourceMetrics.InstrumentationLibraryMetrics().AppendEmpty() - metricSlice := instrumentationLibraryMetrics.Metrics() - appendMetric := func(name string, dataType pdata.MetricDataType) pdata.Metric { + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + metricSlice := scopeMetrics.Metrics() + appendMetric := func(name string, dataType pmetric.MetricDataType) pmetric.Metric { metric := metricSlice.AppendEmpty() metric.SetName(name) metric.SetDataType(dataType) @@ -223,10 +225,10 @@ func TestConsumeMetricsNaN(t *testing.T) { } for _, value := range []float64{math.NaN(), math.Inf(-1), math.Inf(1)} { - metric := appendMetric("gauge", pdata.MetricDataTypeGauge) + metric := appendMetric("gauge", pmetric.MetricDataTypeGauge) gauge := metric.Gauge() dp := gauge.DataPoints().AppendEmpty() - dp.SetTimestamp(pdata.NewTimestampFromTime(timestamp)) + dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) dp.SetDoubleVal(value) } @@ -236,11 +238,11 @@ func TestConsumeMetricsNaN(t *testing.T) { } func TestConsumeMetricsHostCPU(t *testing.T) { - metrics := pdata.NewMetrics() + metrics := pmetric.NewMetrics() resourceMetrics := metrics.ResourceMetrics().AppendEmpty() - instrumentationLibraryMetrics := resourceMetrics.InstrumentationLibraryMetrics().AppendEmpty() - metricSlice := instrumentationLibraryMetrics.Metrics() - appendMetric := func(name string, dataType pdata.MetricDataType) pdata.Metric { + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + metricSlice := scopeMetrics.Metrics() + appendMetric := func(name string, dataType pmetric.MetricDataType) pmetric.Metric { metric := metricSlice.AppendEmpty() metric.SetName(name) metric.SetDataType(dataType) @@ -248,65 +250,65 @@ func TestConsumeMetricsHostCPU(t *testing.T) { } timestamp := time.Unix(123, 0).UTC() - addFloat64Gauge := func(name string, value float64, attributes map[string]pdata.AttributeValue) { - metric := appendMetric(name, pdata.MetricDataTypeGauge) + addFloat64Gauge := func(name string, value float64, attributes map[string]interface{}) { + metric := appendMetric(name, pmetric.MetricDataTypeGauge) sum := metric.Gauge() dp := sum.DataPoints().AppendEmpty() - dp.SetTimestamp(pdata.NewTimestampFromTime(timestamp)) + dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) dp.SetDoubleVal(value) - pdata.NewAttributeMapFromMap(attributes).CopyTo(dp.Attributes()) + pcommon.NewMapFromRaw(attributes).CopyTo(dp.Attributes()) } - addFloat64Gauge("system.cpu.utilization", 0.8, map[string]pdata.AttributeValue{ - "state": pdata.NewAttributeValueString("idle"), - "cpu": pdata.NewAttributeValueString("0"), + addFloat64Gauge("system.cpu.utilization", 0.8, map[string]interface{}{ + "state": "idle", + "cpu": "0", }) - addFloat64Gauge("system.cpu.utilization", 0.1, map[string]pdata.AttributeValue{ - "state": pdata.NewAttributeValueString("system"), - "cpu": pdata.NewAttributeValueString("0"), + addFloat64Gauge("system.cpu.utilization", 0.1, map[string]interface{}{ + "state": "system", + "cpu": "0", }) - addFloat64Gauge("system.cpu.utilization", 0.1, map[string]pdata.AttributeValue{ - "state": pdata.NewAttributeValueString("user"), - "cpu": pdata.NewAttributeValueString("0"), + addFloat64Gauge("system.cpu.utilization", 0.1, map[string]interface{}{ + "state": "user", + "cpu": "0", }) - addFloat64Gauge("system.cpu.utilization", 0.45, map[string]pdata.AttributeValue{ - "state": pdata.NewAttributeValueString("idle"), - "cpu": pdata.NewAttributeValueString("1"), + addFloat64Gauge("system.cpu.utilization", 0.45, map[string]interface{}{ + "state": "idle", + "cpu": "1", }) - addFloat64Gauge("system.cpu.utilization", 0.05, map[string]pdata.AttributeValue{ - "state": pdata.NewAttributeValueString("system"), - "cpu": pdata.NewAttributeValueString("1"), + addFloat64Gauge("system.cpu.utilization", 0.05, map[string]interface{}{ + "state": "system", + "cpu": "1", }) - addFloat64Gauge("system.cpu.utilization", 0.5, map[string]pdata.AttributeValue{ - "state": pdata.NewAttributeValueString("user"), - "cpu": pdata.NewAttributeValueString("1"), + addFloat64Gauge("system.cpu.utilization", 0.5, map[string]interface{}{ + "state": "user", + "cpu": "1", }) - addFloat64Gauge("system.cpu.utilization", 0.59, map[string]pdata.AttributeValue{ - "state": pdata.NewAttributeValueString("idle"), - "cpu": pdata.NewAttributeValueString("2"), + addFloat64Gauge("system.cpu.utilization", 0.59, map[string]interface{}{ + "state": "idle", + "cpu": "2", }) - addFloat64Gauge("system.cpu.utilization", 0.01, map[string]pdata.AttributeValue{ - "state": pdata.NewAttributeValueString("system"), - "cpu": pdata.NewAttributeValueString("2"), + addFloat64Gauge("system.cpu.utilization", 0.01, map[string]interface{}{ + "state": "system", + "cpu": "2", }) - addFloat64Gauge("system.cpu.utilization", 0.4, map[string]pdata.AttributeValue{ - "state": pdata.NewAttributeValueString("user"), - "cpu": pdata.NewAttributeValueString("2"), + addFloat64Gauge("system.cpu.utilization", 0.4, map[string]interface{}{ + "state": "user", + "cpu": "2", }) - addFloat64Gauge("system.cpu.utilization", 0.6, map[string]pdata.AttributeValue{ - "state": pdata.NewAttributeValueString("idle"), - "cpu": pdata.NewAttributeValueString("3"), + addFloat64Gauge("system.cpu.utilization", 0.6, map[string]interface{}{ + "state": "idle", + "cpu": "3", }) - addFloat64Gauge("system.cpu.utilization", 0.3, map[string]pdata.AttributeValue{ - "state": pdata.NewAttributeValueString("system"), - "cpu": pdata.NewAttributeValueString("3"), + addFloat64Gauge("system.cpu.utilization", 0.3, map[string]interface{}{ + "state": "system", + "cpu": "3", }) - addFloat64Gauge("system.cpu.utilization", 0.1, map[string]pdata.AttributeValue{ - "state": pdata.NewAttributeValueString("user"), - "cpu": pdata.NewAttributeValueString("3"), + addFloat64Gauge("system.cpu.utilization", 0.1, map[string]interface{}{ + "state": "user", + "cpu": "3", }) events, _ := transformMetrics(t, metrics) @@ -496,11 +498,11 @@ func TestConsumeMetricsHostCPU(t *testing.T) { } func TestConsumeMetricsHostMemory(t *testing.T) { - metrics := pdata.NewMetrics() + metrics := pmetric.NewMetrics() resourceMetrics := metrics.ResourceMetrics().AppendEmpty() - instrumentationLibraryMetrics := resourceMetrics.InstrumentationLibraryMetrics().AppendEmpty() - metricSlice := instrumentationLibraryMetrics.Metrics() - appendMetric := func(name string, dataType pdata.MetricDataType) pdata.Metric { + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + metricSlice := scopeMetrics.Metrics() + appendMetric := func(name string, dataType pmetric.MetricDataType) pmetric.Metric { metric := metricSlice.AppendEmpty() metric.SetName(name) metric.SetDataType(dataType) @@ -508,19 +510,19 @@ func TestConsumeMetricsHostMemory(t *testing.T) { } timestamp := time.Unix(123, 0).UTC() - addInt64Sum := func(name string, value int64, attributes map[string]pdata.AttributeValue) { - metric := appendMetric(name, pdata.MetricDataTypeSum) + addInt64Sum := func(name string, value int64, attributes map[string]interface{}) { + metric := appendMetric(name, pmetric.MetricDataTypeSum) sum := metric.Sum() dp := sum.DataPoints().AppendEmpty() - dp.SetTimestamp(pdata.NewTimestampFromTime(timestamp)) + dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) dp.SetIntVal(value) - pdata.NewAttributeMapFromMap(attributes).CopyTo(dp.Attributes()) + pcommon.NewMapFromRaw(attributes).CopyTo(dp.Attributes()) } - addInt64Sum("system.memory.usage", 4773351424, map[string]pdata.AttributeValue{ - "state": pdata.NewAttributeValueString("free"), + addInt64Sum("system.memory.usage", 4773351424, map[string]interface{}{ + "state": "free", }) - addInt64Sum("system.memory.usage", 3563778048, map[string]pdata.AttributeValue{ - "state": pdata.NewAttributeValueString("used"), + addInt64Sum("system.memory.usage", 3563778048, map[string]interface{}{ + "state": "used", }) events, _ := transformMetrics(t, metrics) service := model.Service{Name: "unknown", Language: model.Language{Name: "unknown"}} @@ -572,11 +574,11 @@ func TestConsumeMetricsHostMemory(t *testing.T) { } func TestConsumeMetrics_JVM(t *testing.T) { - metrics := pdata.NewMetrics() + metrics := pmetric.NewMetrics() resourceMetrics := metrics.ResourceMetrics().AppendEmpty() - instrumentationLibraryMetrics := resourceMetrics.InstrumentationLibraryMetrics().AppendEmpty() - metricSlice := instrumentationLibraryMetrics.Metrics() - appendMetric := func(name string, dataType pdata.MetricDataType) pdata.Metric { + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + metricSlice := scopeMetrics.Metrics() + appendMetric := func(name string, dataType pmetric.MetricDataType) pmetric.Metric { metric := metricSlice.AppendEmpty() metric.SetName(name) metric.SetDataType(dataType) @@ -584,36 +586,36 @@ func TestConsumeMetrics_JVM(t *testing.T) { } timestamp := time.Unix(123, 0).UTC() - addInt64Sum := func(name string, value int64, attributes map[string]pdata.AttributeValue) { - metric := appendMetric(name, pdata.MetricDataTypeSum) + addInt64Sum := func(name string, value int64, attributes map[string]interface{}) { + metric := appendMetric(name, pmetric.MetricDataTypeSum) sum := metric.Sum() dp := sum.DataPoints().AppendEmpty() - dp.SetTimestamp(pdata.NewTimestampFromTime(timestamp)) + dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) dp.SetIntVal(value) - pdata.NewAttributeMapFromMap(attributes).CopyTo(dp.Attributes()) + pcommon.NewMapFromRaw(attributes).CopyTo(dp.Attributes()) } - addInt64Gauge := func(name string, value int64, attributes map[string]pdata.AttributeValue) { - metric := appendMetric(name, pdata.MetricDataTypeGauge) + addInt64Gauge := func(name string, value int64, attributes map[string]interface{}) { + metric := appendMetric(name, pmetric.MetricDataTypeGauge) sum := metric.Gauge() dp := sum.DataPoints().AppendEmpty() - dp.SetTimestamp(pdata.NewTimestampFromTime(timestamp)) + dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) dp.SetIntVal(value) - pdata.NewAttributeMapFromMap(attributes).CopyTo(dp.Attributes()) + pcommon.NewMapFromRaw(attributes).CopyTo(dp.Attributes()) } - addInt64Sum("runtime.jvm.gc.time", 9, map[string]pdata.AttributeValue{ - "gc": pdata.NewAttributeValueString("G1 Young Generation"), + addInt64Sum("runtime.jvm.gc.time", 9, map[string]interface{}{ + "gc": "G1 Young Generation", }) - addInt64Sum("runtime.jvm.gc.count", 2, map[string]pdata.AttributeValue{ - "gc": pdata.NewAttributeValueString("G1 Young Generation"), + addInt64Sum("runtime.jvm.gc.count", 2, map[string]interface{}{ + "gc": "G1 Young Generation", }) - addInt64Gauge("runtime.jvm.memory.area", 42, map[string]pdata.AttributeValue{ - "area": pdata.NewAttributeValueString("heap"), - "type": pdata.NewAttributeValueString("used"), + addInt64Gauge("runtime.jvm.memory.area", 42, map[string]interface{}{ + "area": "heap", + "type": "used", }) - addInt64Gauge("runtime.jvm.memory.area", 24, map[string]pdata.AttributeValue{ - "area": pdata.NewAttributeValueString("heap"), - "type": pdata.NewAttributeValueString("used"), - "pool": pdata.NewAttributeValueString("eden"), + addInt64Gauge("runtime.jvm.memory.area", 24, map[string]interface{}{ + "area": "heap", + "type": "used", + "pool": "eden", }) events, _ := transformMetrics(t, metrics) @@ -710,7 +712,7 @@ func TestConsumeMetrics_JVM(t *testing.T) { } func TestConsumeMetricsExportTimestamp(t *testing.T) { - metrics := pdata.NewMetrics() + metrics := pmetric.NewMetrics() resourceMetrics := metrics.ResourceMetrics().AppendEmpty() // The actual timestamps will be non-deterministic, as they are adjusted @@ -729,13 +731,13 @@ func TestConsumeMetricsExportTimestamp(t *testing.T) { dataPointOffset := -time.Second exportedDataPointTimestamp := exportTimestamp.Add(dataPointOffset) - instrumentationLibraryMetrics := resourceMetrics.InstrumentationLibraryMetrics().AppendEmpty() - metric := instrumentationLibraryMetrics.Metrics().AppendEmpty() + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + metric := scopeMetrics.Metrics().AppendEmpty() metric.SetName("int_gauge") - metric.SetDataType(pdata.MetricDataTypeGauge) + metric.SetDataType(pmetric.MetricDataTypeGauge) intGauge := metric.Gauge() dp := intGauge.DataPoints().AppendEmpty() - dp.SetTimestamp(pdata.NewTimestampFromTime(exportedDataPointTimestamp)) + dp.SetTimestamp(pcommon.NewTimestampFromTime(exportedDataPointTimestamp)) dp.SetIntVal(1) events, _ := transformMetrics(t, metrics) @@ -747,7 +749,7 @@ func TestMetricsLogging(t *testing.T) { for _, level := range []logp.Level{logp.InfoLevel, logp.DebugLevel} { t.Run(level.String(), func(t *testing.T) { logp.DevelopmentSetup(logp.ToObserverOutput(), logp.WithLevel(level)) - transformMetrics(t, pdata.NewMetrics()) + transformMetrics(t, pmetric.NewMetrics()) logs := logp.ObserverLogs().TakeAll() if level == logp.InfoLevel { assert.Empty(t, logs) @@ -758,7 +760,7 @@ func TestMetricsLogging(t *testing.T) { } } -func transformMetrics(t *testing.T, metrics pdata.Metrics) ([]model.APMEvent, otel.ConsumerStats) { +func transformMetrics(t *testing.T, metrics pmetric.Metrics) ([]model.APMEvent, otel.ConsumerStats) { var batches []*model.Batch recorder := batchRecorderBatchProcessor(&batches) diff --git a/internal/processor/otel/test_approved/span_jaeger_http.approved.json b/internal/processor/otel/test_approved/span_jaeger_http.approved.json index ae829d5c68a..e9fc64d7ec0 100644 --- a/internal/processor/otel/test_approved/span_jaeger_http.approved.json +++ b/internal/processor/otel/test_approved/span_jaeger_http.approved.json @@ -398,9 +398,9 @@ } }, "labels": { - "event": "baggage", "isValid": "false" }, + "message": "baggage", "parent": { "id": "0000000058585858" }, diff --git a/internal/processor/otel/test_approved/transaction_jaeger_full.approved.json b/internal/processor/otel/test_approved/transaction_jaeger_full.approved.json index b68ee894867..cab73d7b9d6 100644 --- a/internal/processor/otel/test_approved/transaction_jaeger_full.approved.json +++ b/internal/processor/otel/test_approved/transaction_jaeger_full.approved.json @@ -430,9 +430,9 @@ "version": "1.1" }, "labels": { - "event": "baggage", "isValid": "false" }, + "message": "baggage", "processor": { "event": "log", "name": "log" diff --git a/internal/processor/otel/timestamps.go b/internal/processor/otel/timestamps.go index 17f0403f329..5913b337a1e 100644 --- a/internal/processor/otel/timestamps.go +++ b/internal/processor/otel/timestamps.go @@ -20,13 +20,13 @@ package otel import ( "time" - "go.opentelemetry.io/collector/model/pdata" + "go.opentelemetry.io/collector/pdata/pcommon" ) // exportTimestamp extracts the `telemetry.sdk.elastic_export_timestamp` // resource attribute as a timestamp, and returns a boolean indicating // whether the attribute was found. -func exportTimestamp(resource pdata.Resource) (time.Time, bool) { +func exportTimestamp(resource pcommon.Resource) (time.Time, bool) { attr, ok := resource.Attributes().Get("telemetry.sdk.elastic_export_timestamp") if !ok { return time.Time{}, false diff --git a/internal/processor/otel/traces.go b/internal/processor/otel/traces.go index 8b813847cc1..d38e834bac2 100644 --- a/internal/processor/otel/traces.go +++ b/internal/processor/otel/traces.go @@ -45,9 +45,10 @@ import ( "time" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/model/otlp" - "go.opentelemetry.io/collector/model/pdata" - semconv "go.opentelemetry.io/collector/model/semconv/v1.5.0" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + semconv "go.opentelemetry.io/collector/semconv/v1.5.0" "google.golang.org/grpc/codes" "github.com/elastic/elastic-agent-libs/logp" @@ -78,8 +79,8 @@ const ( ) var ( - jsonTracesMarshaler = otlp.NewJSONTracesMarshaler() - jsonMetricsMarshaler = otlp.NewJSONMetricsMarshaler() + jsonTracesMarshaler = ptrace.NewJSONMarshaler() + jsonMetricsMarshaler = pmetric.NewJSONMarshaler() ) // Consumer transforms open-telemetry data to be compatible with elastic APM data @@ -118,7 +119,7 @@ func (c *Consumer) Capabilities() consumer.Capabilities { // ConsumeTraces consumes OpenTelemetry trace data, // converting into Elastic APM events and reporting to the Elastic APM schema. -func (c *Consumer) ConsumeTraces(ctx context.Context, traces pdata.Traces) error { +func (c *Consumer) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error { receiveTimestamp := time.Now() logger := logp.NewLogger(logs.Otel) if logger.IsDebug() { @@ -133,7 +134,7 @@ func (c *Consumer) ConsumeTraces(ctx context.Context, traces pdata.Traces) error return c.Processor.ProcessBatch(ctx, batch) } -func (c *Consumer) convert(td pdata.Traces, receiveTimestamp time.Time, logger *logp.Logger) *model.Batch { +func (c *Consumer) convert(td ptrace.Traces, receiveTimestamp time.Time, logger *logp.Logger) *model.Batch { batch := model.Batch{} resourceSpans := td.ResourceSpans() for i := 0; i < resourceSpans.Len(); i++ { @@ -143,7 +144,7 @@ func (c *Consumer) convert(td pdata.Traces, receiveTimestamp time.Time, logger * } func (c *Consumer) convertResourceSpans( - resourceSpans pdata.ResourceSpans, + resourceSpans ptrace.ResourceSpans, receiveTimestamp time.Time, logger *logp.Logger, out *model.Batch, @@ -155,16 +156,14 @@ func (c *Consumer) convertResourceSpans( if exportTimestamp, ok := exportTimestamp(resource); ok { timeDelta = receiveTimestamp.Sub(exportTimestamp) } - instrumentationLibrarySpans := resourceSpans.InstrumentationLibrarySpans() - for i := 0; i < instrumentationLibrarySpans.Len(); i++ { - c.convertInstrumentationLibrarySpans( - instrumentationLibrarySpans.At(i), baseEvent, timeDelta, logger, out, - ) + scopeSpans := resourceSpans.ScopeSpans() + for i := 0; i < scopeSpans.Len(); i++ { + c.convertScopeSpans(scopeSpans.At(i), baseEvent, timeDelta, logger, out) } } -func (c *Consumer) convertInstrumentationLibrarySpans( - in pdata.InstrumentationLibrarySpans, +func (c *Consumer) convertScopeSpans( + in ptrace.ScopeSpans, baseEvent model.APMEvent, timeDelta time.Duration, logger *logp.Logger, @@ -172,13 +171,13 @@ func (c *Consumer) convertInstrumentationLibrarySpans( ) { otelSpans := in.Spans() for i := 0; i < otelSpans.Len(); i++ { - c.convertSpan(otelSpans.At(i), in.InstrumentationLibrary(), baseEvent, timeDelta, logger, out) + c.convertSpan(otelSpans.At(i), in.Scope(), baseEvent, timeDelta, logger, out) } } func (c *Consumer) convertSpan( - otelSpan pdata.Span, - otelLibrary pdata.InstrumentationLibrary, + otelSpan ptrace.Span, + otelLibrary pcommon.InstrumentationScope, baseEvent model.APMEvent, timeDelta time.Duration, logger *logp.Logger, @@ -208,7 +207,7 @@ func (c *Consumer) convertSpan( event.Event.Duration = duration event.Event.Outcome = spanStatusOutcome(otelSpan.Status()) event.Parent.ID = parentID - if root || otelSpan.Kind() == pdata.SpanKindServer || otelSpan.Kind() == pdata.SpanKindConsumer { + if root || otelSpan.Kind() == ptrace.SpanKindServer || otelSpan.Kind() == ptrace.SpanKindConsumer { event.Processor = model.TransactionProcessor event.Transaction = &model.Transaction{ ID: spanID, @@ -246,9 +245,9 @@ func (c *Consumer) convertSpan( // TranslateTransaction converts incoming otlp/otel trace data into the // expected elasticsearch format. func TranslateTransaction( - attributes pdata.AttributeMap, - spanStatus pdata.SpanStatus, - library pdata.InstrumentationLibrary, + attributes pcommon.Map, + spanStatus ptrace.SpanStatus, + library pcommon.InstrumentationScope, event *model.APMEvent, ) { isJaeger := strings.HasPrefix(event.Agent.Name, "Jaeger") @@ -271,8 +270,8 @@ func TranslateTransaction( var foundSpanType int var message model.Message - var samplerType, samplerParam pdata.AttributeValue - attributes.Range(func(kDots string, v pdata.AttributeValue) bool { + var samplerType, samplerParam pcommon.Value + attributes.Range(func(kDots string, v pcommon.Value) bool { if isJaeger { switch kDots { case "sampler.type": @@ -286,13 +285,13 @@ func TranslateTransaction( k := replaceDots(kDots) switch v.Type() { - case pdata.AttributeValueTypeArray: + case pcommon.ValueTypeSlice: setLabel(k, event, ifaceAttributeValue(v)) - case pdata.AttributeValueTypeBool: + case pcommon.ValueTypeBool: setLabel(k, event, ifaceAttributeValue(v)) - case pdata.AttributeValueTypeDouble: + case pcommon.ValueTypeDouble: setLabel(k, event, ifaceAttributeValue(v)) - case pdata.AttributeValueTypeInt: + case pcommon.ValueTypeInt: switch kDots { case semconv.AttributeHTTPStatusCode: foundSpanType = httpSpan @@ -307,8 +306,8 @@ func TranslateTransaction( default: setLabel(k, event, ifaceAttributeValue(v)) } - case pdata.AttributeValueTypeMap: - case pdata.AttributeValueTypeString: + case pcommon.ValueTypeMap: + case pcommon.ValueTypeString: stringval := truncate(v.StringVal()) switch kDots { // http.* @@ -449,7 +448,7 @@ func TranslateTransaction( event.Client = model.Client{IP: event.Source.IP, Port: event.Source.Port, Domain: event.Source.Domain} } - if samplerType != (pdata.AttributeValue{}) { + if samplerType != (pcommon.Value{}) { // The client has reported its sampling rate, so we can use it to extrapolate span metrics. parseSamplerAttributes(samplerType, samplerParam, event) } else { @@ -475,7 +474,7 @@ const ( // TranslateSpan converts incoming otlp/otel trace data into the // expected elasticsearch format. -func TranslateSpan(spanKind pdata.SpanKind, attributes pdata.AttributeMap, event *model.APMEvent) { +func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *model.APMEvent) { isJaeger := strings.HasPrefix(event.Agent.Name, "Jaeger") var ( @@ -515,8 +514,8 @@ func TranslateSpan(spanKind pdata.SpanKind, attributes pdata.AttributeMap, event var destinationService model.DestinationService var serviceTarget model.ServiceTarget var foundSpanType int - var samplerType, samplerParam pdata.AttributeValue - attributes.Range(func(kDots string, v pdata.AttributeValue) bool { + var samplerType, samplerParam pcommon.Value + attributes.Range(func(kDots string, v pcommon.Value) bool { if isJaeger { switch kDots { case "sampler.type": @@ -530,9 +529,9 @@ func TranslateSpan(spanKind pdata.SpanKind, attributes pdata.AttributeMap, event k := replaceDots(kDots) switch v.Type() { - case pdata.AttributeValueTypeArray: + case pcommon.ValueTypeSlice: setLabel(k, event, ifaceAttributeValueSlice(v.SliceVal())) - case pdata.AttributeValueTypeBool: + case pcommon.ValueTypeBool: switch kDots { case semconv.AttributeMessagingTempDestination: messageTempDestination = v.BoolVal() @@ -540,9 +539,9 @@ func TranslateSpan(spanKind pdata.SpanKind, attributes pdata.AttributeMap, event default: setLabel(k, event, strconv.FormatBool(v.BoolVal())) } - case pdata.AttributeValueTypeDouble: + case pcommon.ValueTypeDouble: setLabel(k, event, v.DoubleVal()) - case pdata.AttributeValueTypeInt: + case pcommon.ValueTypeInt: switch kDots { case "http.status_code": httpResponse.StatusCode = int(v.IntVal()) @@ -555,7 +554,7 @@ func TranslateSpan(spanKind pdata.SpanKind, attributes pdata.AttributeMap, event default: setLabel(k, event, v.IntVal()) } - case pdata.AttributeValueTypeString: + case pcommon.ValueTypeString: stringval := truncate(v.StringVal()) switch kDots { @@ -761,7 +760,7 @@ func TranslateSpan(spanKind pdata.SpanKind, attributes pdata.AttributeMap, event case messagingSpan: event.Span.Type = "messaging" event.Span.Subtype = messageSystem - if messageOperation == "" && spanKind == pdata.SpanKindProducer { + if messageOperation == "" && spanKind == ptrace.SpanKindProducer { messageOperation = "send" } event.Span.Action = messageOperation @@ -800,7 +799,7 @@ func TranslateSpan(spanKind pdata.SpanKind, attributes pdata.AttributeMap, event // Only set event.Span.Type if not already set if event.Span.Type == "" { switch spanKind { - case pdata.SpanKindInternal: + case ptrace.SpanKindInternal: event.Span.Type = "app" event.Span.Subtype = "internal" default: @@ -824,7 +823,7 @@ func TranslateSpan(spanKind pdata.SpanKind, attributes pdata.AttributeMap, event event.Service.Target = &serviceTarget } - if samplerType != (pdata.AttributeValue{}) { + if samplerType != (pcommon.Value{}) { // The client has reported its sampling rate, so we can use it to extrapolate transaction metrics. parseSamplerAttributes(samplerType, samplerParam, event) } else { @@ -832,7 +831,7 @@ func TranslateSpan(spanKind pdata.SpanKind, attributes pdata.AttributeMap, event } } -func parseSamplerAttributes(samplerType, samplerParam pdata.AttributeValue, event *model.APMEvent) { +func parseSamplerAttributes(samplerType, samplerParam pcommon.Value, event *model.APMEvent) { switch samplerType := samplerType.StringVal(); samplerType { case "probabilistic": probability := samplerParam.DoubleVal() @@ -847,9 +846,9 @@ func parseSamplerAttributes(samplerType, samplerParam pdata.AttributeValue, even default: event.Labels.Set("sampler_type", samplerType) switch samplerParam.Type() { - case pdata.AttributeValueTypeBool: + case pcommon.ValueTypeBool: event.Labels.Set("sampler_param", strconv.FormatBool(samplerParam.BoolVal())) - case pdata.AttributeValueTypeDouble: + case pcommon.ValueTypeDouble: event.NumericLabels.Set("sampler_param", samplerParam.DoubleVal()) } } @@ -857,7 +856,7 @@ func parseSamplerAttributes(samplerType, samplerParam pdata.AttributeValue, even func convertSpanEvent( logger *logp.Logger, - spanEvent pdata.SpanEvent, + spanEvent ptrace.SpanEvent, parent model.APMEvent, // either span or transaction timeDelta time.Duration, ) model.APMEvent { @@ -878,7 +877,7 @@ func convertSpanEvent( // `The name of the event MUST be "exception"` var exceptionEscaped bool var exceptionMessage, exceptionStacktrace, exceptionType string - spanEvent.Attributes().Range(func(k string, v pdata.AttributeValue) bool { + spanEvent.Attributes().Range(func(k string, v pcommon.Value) bool { switch k { case semconv.AttributeExceptionMessage: exceptionMessage = v.StringVal() @@ -912,46 +911,50 @@ func convertSpanEvent( event.Processor = model.LogProcessor event.DataStream.Type = datastreams.LogsType event.Message = spanEvent.Name() - spanEvent.Attributes().Range(func(k string, v pdata.AttributeValue) bool { - setLabel(replaceDots(k), &event, ifaceAttributeValue(v)) + spanEvent.Attributes().Range(func(k string, v pcommon.Value) bool { + k = replaceDots(k) + if isJaeger && k == "message" { + event.Message = truncate(v.StringVal()) + return true + } + setLabel(k, &event, ifaceAttributeValue(v)) return true }) } return event } -func convertJaegerErrorSpanEvent(logger *logp.Logger, event pdata.SpanEvent, apmEvent *model.APMEvent) *model.Error { +func convertJaegerErrorSpanEvent(logger *logp.Logger, event ptrace.SpanEvent, apmEvent *model.APMEvent) *model.Error { var isError bool var exMessage, exType string - logMessage := event.Name() - hasMinimalInfo := logMessage != "" - event.Attributes().Range(func(k string, v pdata.AttributeValue) bool { - if v.Type() != pdata.AttributeValueTypeString { + var logMessage string + + if name := truncate(event.Name()); name == "error" { + isError = true // according to opentracing spec + } else { + // Jaeger seems to send the message in the 'event' field. + // + // In case 'message' is sent we will use that, otherwise + // we will use 'event'. + logMessage = name + } + + event.Attributes().Range(func(k string, v pcommon.Value) bool { + if v.Type() != pcommon.ValueTypeString { return true } stringval := truncate(v.StringVal()) switch k { case "error", "error.object": exMessage = stringval - hasMinimalInfo = true isError = true - case "event": - if stringval == "error" { // according to opentracing spec - isError = true - } else if logMessage == "" { - // Jaeger seems to send the message in the 'event' field. - // - // In case 'message' is sent, the event's name will be set - // and we will use that. Otherwise we use 'event'. - logMessage = stringval - hasMinimalInfo = true - } case "error.kind": exType = stringval - hasMinimalInfo = true isError = true case "level": isError = stringval == "error" + case "message": + logMessage = stringval default: setLabel(replaceDots(k), apmEvent, ifaceAttributeValue(v)) } @@ -960,7 +963,7 @@ func convertJaegerErrorSpanEvent(logger *logp.Logger, event pdata.SpanEvent, apm if !isError { return nil } - if !hasMinimalInfo { + if logMessage == "" && exMessage == "" && exType == "" { logger.Debugf("Cannot convert span event (name=%q) into elastic apm error: %v", event.Name()) return nil } @@ -995,7 +998,7 @@ func setErrorContext(out *model.APMEvent, parent model.APMEvent) { } } -func translateSpanLinks(out *model.APMEvent, in pdata.SpanLinkSlice) { +func translateSpanLinks(out *model.APMEvent, in ptrace.SpanLinkSlice) { n := in.Len() if n == 0 { return @@ -1019,11 +1022,11 @@ func replaceDots(s string) string { // spanStatusOutcome returns the outcome for transactions and spans based on // the given OTLP span status. -func spanStatusOutcome(status pdata.SpanStatus) string { +func spanStatusOutcome(status ptrace.SpanStatus) string { switch status.Code() { - case pdata.StatusCodeOk: + case ptrace.StatusCodeOk: return outcomeSuccess - case pdata.StatusCodeError: + case ptrace.StatusCodeError: return outcomeFailure } return outcomeUnknown @@ -1032,11 +1035,11 @@ func spanStatusOutcome(status pdata.SpanStatus) string { // spanStatusResult returns the result for transactions based on the given // OTLP span status. If the span status is unknown, an empty result string // is returned. -func spanStatusResult(status pdata.SpanStatus) string { +func spanStatusResult(status ptrace.SpanStatus) string { switch status.Code() { - case pdata.StatusCodeOk: + case ptrace.StatusCodeOk: return "Success" - case pdata.StatusCodeError: + case ptrace.StatusCodeError: return "Error" } return "" diff --git a/internal/processor/otel/traces_test.go b/internal/processor/otel/traces_test.go index 7a95d8cf715..5e989152830 100644 --- a/internal/processor/otel/traces_test.go +++ b/internal/processor/otel/traces_test.go @@ -47,8 +47,9 @@ import ( jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/model/pdata" - semconv "go.opentelemetry.io/collector/model/semconv/v1.5.0" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + semconv "go.opentelemetry.io/collector/semconv/v1.5.0" "google.golang.org/grpc/codes" "github.com/elastic/elastic-agent-libs/logp" @@ -66,23 +67,23 @@ func TestConsumer_ConsumeTraces_Empty(t *testing.T) { } consumer := otel.Consumer{Processor: processor} - traces := pdata.NewTraces() + traces := ptrace.NewTraces() assert.NoError(t, consumer.ConsumeTraces(context.Background(), traces)) } func TestOutcome(t *testing.T) { - test := func(t *testing.T, expectedOutcome, expectedResult string, statusCode pdata.StatusCode) { + test := func(t *testing.T, expectedOutcome, expectedResult string, statusCode ptrace.StatusCode) { t.Helper() traces, spans := newTracesSpans() otelSpan1 := spans.Spans().AppendEmpty() - otelSpan1.SetTraceID(pdata.NewTraceID([16]byte{1})) - otelSpan1.SetSpanID(pdata.NewSpanID([8]byte{2})) + otelSpan1.SetTraceID(pcommon.NewTraceID([16]byte{1})) + otelSpan1.SetSpanID(pcommon.NewSpanID([8]byte{2})) otelSpan1.Status().SetCode(statusCode) otelSpan2 := spans.Spans().AppendEmpty() - otelSpan2.SetTraceID(pdata.NewTraceID([16]byte{1})) - otelSpan2.SetSpanID(pdata.NewSpanID([8]byte{2})) - otelSpan2.SetParentSpanID(pdata.NewSpanID([8]byte{3})) + otelSpan2.SetTraceID(pcommon.NewTraceID([16]byte{1})) + otelSpan2.SetSpanID(pcommon.NewSpanID([8]byte{2})) + otelSpan2.SetParentSpanID(pcommon.NewSpanID([8]byte{3})) otelSpan2.Status().SetCode(statusCode) batch := transformTraces(t, traces) @@ -93,20 +94,20 @@ func TestOutcome(t *testing.T) { assert.Equal(t, expectedOutcome, batch[1].Event.Outcome) } - test(t, "unknown", "", pdata.StatusCodeUnset) - test(t, "success", "Success", pdata.StatusCodeOk) - test(t, "failure", "Error", pdata.StatusCodeError) + test(t, "unknown", "", ptrace.StatusCodeUnset) + test(t, "success", "Success", ptrace.StatusCodeOk) + test(t, "failure", "Error", ptrace.StatusCodeError) } func TestRepresentativeCount(t *testing.T) { traces, spans := newTracesSpans() otelSpan1 := spans.Spans().AppendEmpty() - otelSpan1.SetTraceID(pdata.NewTraceID([16]byte{1})) - otelSpan1.SetSpanID(pdata.NewSpanID([8]byte{2})) + otelSpan1.SetTraceID(pcommon.NewTraceID([16]byte{1})) + otelSpan1.SetSpanID(pcommon.NewSpanID([8]byte{2})) otelSpan2 := spans.Spans().AppendEmpty() - otelSpan2.SetTraceID(pdata.NewTraceID([16]byte{1})) - otelSpan2.SetSpanID(pdata.NewSpanID([8]byte{2})) - otelSpan2.SetParentSpanID(pdata.NewSpanID([8]byte{3})) + otelSpan2.SetTraceID(pcommon.NewTraceID([16]byte{1})) + otelSpan2.SetSpanID(pcommon.NewSpanID([8]byte{2})) + otelSpan2.SetParentSpanID(pcommon.NewSpanID([8]byte{3})) batch := transformTraces(t, traces) require.Len(t, batch, 2) @@ -116,7 +117,7 @@ func TestRepresentativeCount(t *testing.T) { } func TestHTTPTransactionURL(t *testing.T) { - test := func(t *testing.T, expected model.URL, attrs map[string]pdata.AttributeValue) { + test := func(t *testing.T, expected model.URL, attrs map[string]interface{}) { t.Helper() event := transformTransactionWithAttributes(t, attrs) assert.Equal(t, expected, event.URL) @@ -131,10 +132,10 @@ func TestHTTPTransactionURL(t *testing.T) { Query: "bar", Domain: "testing.invalid", Port: 80, - }, map[string]pdata.AttributeValue{ - "http.scheme": pdata.NewAttributeValueString("https"), - "http.host": pdata.NewAttributeValueString("testing.invalid:80"), - "http.target": pdata.NewAttributeValueString("/foo?bar"), + }, map[string]interface{}{ + "http.scheme": "https", + "http.host": "testing.invalid:80", + "http.target": "/foo?bar", }) }) t.Run("scheme_servername_nethostport_target", func(t *testing.T) { @@ -146,11 +147,11 @@ func TestHTTPTransactionURL(t *testing.T) { Query: "bar", Domain: "testing.invalid", Port: 80, - }, map[string]pdata.AttributeValue{ - "http.scheme": pdata.NewAttributeValueString("https"), - "http.server_name": pdata.NewAttributeValueString("testing.invalid"), - "net.host.port": pdata.NewAttributeValueInt(80), - "http.target": pdata.NewAttributeValueString("/foo?bar"), + }, map[string]interface{}{ + "http.scheme": "https", + "http.server_name": "testing.invalid", + "net.host.port": 80, + "http.target": "/foo?bar", }) }) t.Run("scheme_nethostname_nethostport_target", func(t *testing.T) { @@ -162,11 +163,11 @@ func TestHTTPTransactionURL(t *testing.T) { Query: "bar", Domain: "testing.invalid", Port: 80, - }, map[string]pdata.AttributeValue{ - "http.scheme": pdata.NewAttributeValueString("https"), - "net.host.name": pdata.NewAttributeValueString("testing.invalid"), - "net.host.port": pdata.NewAttributeValueInt(80), - "http.target": pdata.NewAttributeValueString("/foo?bar"), + }, map[string]interface{}{ + "http.scheme": "https", + "net.host.name": "testing.invalid", + "net.host.port": 80, + "http.target": "/foo?bar", }) }) t.Run("http.url", func(t *testing.T) { @@ -178,8 +179,8 @@ func TestHTTPTransactionURL(t *testing.T) { Query: "bar", Domain: "testing.invalid", Port: 80, - }, map[string]pdata.AttributeValue{ - "http.url": pdata.NewAttributeValueString("https://testing.invalid:80/foo?bar"), + }, map[string]interface{}{ + "http.url": "https://testing.invalid:80/foo?bar", }) }) t.Run("host_no_port", func(t *testing.T) { @@ -189,10 +190,10 @@ func TestHTTPTransactionURL(t *testing.T) { Full: "https://testing.invalid/foo", Path: "/foo", Domain: "testing.invalid", - }, map[string]pdata.AttributeValue{ - "http.scheme": pdata.NewAttributeValueString("https"), - "http.host": pdata.NewAttributeValueString("testing.invalid"), - "http.target": pdata.NewAttributeValueString("/foo"), + }, map[string]interface{}{ + "http.scheme": "https", + "http.host": "testing.invalid", + "http.target": "/foo", }) }) t.Run("ipv6_host_no_port", func(t *testing.T) { @@ -202,10 +203,10 @@ func TestHTTPTransactionURL(t *testing.T) { Full: "https://[::1]/foo", Path: "/foo", Domain: "::1", - }, map[string]pdata.AttributeValue{ - "http.scheme": pdata.NewAttributeValueString("https"), - "http.host": pdata.NewAttributeValueString("[::1]"), - "http.target": pdata.NewAttributeValueString("/foo"), + }, map[string]interface{}{ + "http.scheme": "https", + "http.host": "[::1]", + "http.target": "/foo", }) }) t.Run("default_scheme", func(t *testing.T) { @@ -216,60 +217,60 @@ func TestHTTPTransactionURL(t *testing.T) { Full: "http://testing.invalid/foo", Path: "/foo", Domain: "testing.invalid", - }, map[string]pdata.AttributeValue{ - "http.host": pdata.NewAttributeValueString("testing.invalid"), - "http.target": pdata.NewAttributeValueString("/foo"), + }, map[string]interface{}{ + "http.host": "testing.invalid", + "http.target": "/foo", }) }) } func TestHTTPSpanURL(t *testing.T) { - test := func(t *testing.T, expected string, attrs map[string]pdata.AttributeValue) { + test := func(t *testing.T, expected string, attrs map[string]interface{}) { t.Helper() event := transformSpanWithAttributes(t, attrs) assert.Equal(t, model.URL{Original: expected}, event.URL) } t.Run("host.url", func(t *testing.T) { - test(t, "https://testing.invalid:80/foo?bar", map[string]pdata.AttributeValue{ - "http.url": pdata.NewAttributeValueString("https://testing.invalid:80/foo?bar"), + test(t, "https://testing.invalid:80/foo?bar", map[string]interface{}{ + "http.url": "https://testing.invalid:80/foo?bar", }) }) t.Run("scheme_host_target", func(t *testing.T) { - test(t, "https://testing.invalid:80/foo?bar", map[string]pdata.AttributeValue{ - "http.scheme": pdata.NewAttributeValueString("https"), - "http.host": pdata.NewAttributeValueString("testing.invalid:80"), - "http.target": pdata.NewAttributeValueString("/foo?bar"), + test(t, "https://testing.invalid:80/foo?bar", map[string]interface{}{ + "http.scheme": "https", + "http.host": "testing.invalid:80", + "http.target": "/foo?bar", }) }) t.Run("scheme_netpeername_netpeerport_target", func(t *testing.T) { - test(t, "https://testing.invalid:80/foo?bar", map[string]pdata.AttributeValue{ - "http.scheme": pdata.NewAttributeValueString("https"), - "net.peer.name": pdata.NewAttributeValueString("testing.invalid"), - "net.peer.ip": pdata.NewAttributeValueString("::1"), // net.peer.name preferred - "net.peer.port": pdata.NewAttributeValueInt(80), - "http.target": pdata.NewAttributeValueString("/foo?bar"), + test(t, "https://testing.invalid:80/foo?bar", map[string]interface{}{ + "http.scheme": "https", + "net.peer.name": "testing.invalid", + "net.peer.ip": "::1", // net.peer.name preferred + "net.peer.port": 80, + "http.target": "/foo?bar", }) }) t.Run("scheme_netpeerip_netpeerport_target", func(t *testing.T) { - test(t, "https://[::1]:80/foo?bar", map[string]pdata.AttributeValue{ - "http.scheme": pdata.NewAttributeValueString("https"), - "net.peer.ip": pdata.NewAttributeValueString("::1"), - "net.peer.port": pdata.NewAttributeValueInt(80), - "http.target": pdata.NewAttributeValueString("/foo?bar"), + test(t, "https://[::1]:80/foo?bar", map[string]interface{}{ + "http.scheme": "https", + "net.peer.ip": "::1", + "net.peer.port": 80, + "http.target": "/foo?bar", }) }) t.Run("default_scheme", func(t *testing.T) { // scheme is set to "http" if it can't be deduced from attributes. - test(t, "http://testing.invalid/foo", map[string]pdata.AttributeValue{ - "http.host": pdata.NewAttributeValueString("testing.invalid"), - "http.target": pdata.NewAttributeValueString("/foo"), + test(t, "http://testing.invalid/foo", map[string]interface{}{ + "http.host": "testing.invalid", + "http.target": "/foo", }) }) } func TestHTTPSpanDestination(t *testing.T) { - test := func(t *testing.T, expectedDestination model.Destination, expectedDestinationService *model.DestinationService, attrs map[string]pdata.AttributeValue) { + test := func(t *testing.T, expectedDestination model.Destination, expectedDestinationService *model.DestinationService, attrs map[string]interface{}) { t.Helper() event := transformSpanWithAttributes(t, attrs) assert.Equal(t, expectedDestination, event.Destination) @@ -284,8 +285,8 @@ func TestHTTPSpanDestination(t *testing.T) { Type: "external", Name: "https://testing.invalid", Resource: "testing.invalid:443", - }, map[string]pdata.AttributeValue{ - "http.url": pdata.NewAttributeValueString("https://testing.invalid:443/foo?bar"), + }, map[string]interface{}{ + "http.url": "https://testing.invalid:443/foo?bar", }) }) t.Run("url_port_scheme", func(t *testing.T) { @@ -296,8 +297,8 @@ func TestHTTPSpanDestination(t *testing.T) { Type: "external", Name: "https://testing.invalid", Resource: "testing.invalid:443", - }, map[string]pdata.AttributeValue{ - "http.url": pdata.NewAttributeValueString("https://testing.invalid/foo?bar"), + }, map[string]interface{}{ + "http.url": "https://testing.invalid/foo?bar", }) }) t.Run("url_non_default_port", func(t *testing.T) { @@ -308,8 +309,8 @@ func TestHTTPSpanDestination(t *testing.T) { Type: "external", Name: "https://testing.invalid:444", Resource: "testing.invalid:444", - }, map[string]pdata.AttributeValue{ - "http.url": pdata.NewAttributeValueString("https://testing.invalid:444/foo?bar"), + }, map[string]interface{}{ + "http.url": "https://testing.invalid:444/foo?bar", }) }) t.Run("scheme_host_target", func(t *testing.T) { @@ -320,10 +321,10 @@ func TestHTTPSpanDestination(t *testing.T) { Type: "external", Name: "https://testing.invalid:444", Resource: "testing.invalid:444", - }, map[string]pdata.AttributeValue{ - "http.scheme": pdata.NewAttributeValueString("https"), - "http.host": pdata.NewAttributeValueString("testing.invalid:444"), - "http.target": pdata.NewAttributeValueString("/foo?bar"), + }, map[string]interface{}{ + "http.scheme": "https", + "http.host": "testing.invalid:444", + "http.target": "/foo?bar", }) }) t.Run("scheme_netpeername_nethostport_target", func(t *testing.T) { @@ -334,21 +335,21 @@ func TestHTTPSpanDestination(t *testing.T) { Type: "external", Name: "https://[::1]:444", Resource: "[::1]:444", - }, map[string]pdata.AttributeValue{ - "http.scheme": pdata.NewAttributeValueString("https"), - "net.peer.ip": pdata.NewAttributeValueString("::1"), - "net.peer.port": pdata.NewAttributeValueInt(444), - "http.target": pdata.NewAttributeValueString("/foo?bar"), + }, map[string]interface{}{ + "http.scheme": "https", + "net.peer.ip": "::1", + "net.peer.port": 444, + "http.target": "/foo?bar", }) }) } func TestHTTPTransactionSource(t *testing.T) { - test := func(t *testing.T, expectedDomain, expectedIP string, expectedPort int, attrs map[string]pdata.AttributeValue) { + test := func(t *testing.T, expectedDomain, expectedIP string, expectedPort int, attrs map[string]interface{}) { // "http.method" is a required attribute for HTTP spans, // and its presence causes the transaction's HTTP request // context to be built. - attrs["http.method"] = pdata.NewAttributeValueString("POST") + attrs["http.method"] = "POST" event := transformTransactionWithAttributes(t, attrs) require.NotNil(t, event.HTTP) @@ -365,51 +366,51 @@ func TestHTTPTransactionSource(t *testing.T) { } t.Run("net.peer.ip_port", func(t *testing.T) { - test(t, "", "192.168.0.1", 1234, map[string]pdata.AttributeValue{ - "net.peer.ip": pdata.NewAttributeValueString("192.168.0.1"), - "net.peer.port": pdata.NewAttributeValueInt(1234), + test(t, "", "192.168.0.1", 1234, map[string]interface{}{ + "net.peer.ip": "192.168.0.1", + "net.peer.port": 1234, }) }) t.Run("net.peer.ip", func(t *testing.T) { - test(t, "", "192.168.0.1", 0, map[string]pdata.AttributeValue{ - "net.peer.ip": pdata.NewAttributeValueString("192.168.0.1"), + test(t, "", "192.168.0.1", 0, map[string]interface{}{ + "net.peer.ip": "192.168.0.1", }) }) t.Run("net.peer.ip_name", func(t *testing.T) { - test(t, "source.domain", "192.168.0.1", 0, map[string]pdata.AttributeValue{ - "net.peer.name": pdata.NewAttributeValueString("source.domain"), - "net.peer.ip": pdata.NewAttributeValueString("192.168.0.1"), + test(t, "source.domain", "192.168.0.1", 0, map[string]interface{}{ + "net.peer.name": "source.domain", + "net.peer.ip": "192.168.0.1", }) }) } func TestHTTPTransactionFlavor(t *testing.T) { - event := transformTransactionWithAttributes(t, map[string]pdata.AttributeValue{ - "http.flavor": pdata.NewAttributeValueString("1.1"), + event := transformTransactionWithAttributes(t, map[string]interface{}{ + "http.flavor": "1.1", }) assert.Equal(t, "1.1", event.HTTP.Version) } func TestHTTPTransactionUserAgent(t *testing.T) { - event := transformTransactionWithAttributes(t, map[string]pdata.AttributeValue{ - "http.user_agent": pdata.NewAttributeValueString("Foo/bar (baz)"), + event := transformTransactionWithAttributes(t, map[string]interface{}{ + "http.user_agent": "Foo/bar (baz)", }) assert.Equal(t, model.UserAgent{Original: "Foo/bar (baz)"}, event.UserAgent) } func TestHTTPTransactionClientIP(t *testing.T) { - event := transformTransactionWithAttributes(t, map[string]pdata.AttributeValue{ - "net.peer.ip": pdata.NewAttributeValueString("1.2.3.4"), - "net.peer.port": pdata.NewAttributeValueInt(5678), - "http.client_ip": pdata.NewAttributeValueString("9.10.11.12"), + event := transformTransactionWithAttributes(t, map[string]interface{}{ + "net.peer.ip": "1.2.3.4", + "net.peer.port": 5678, + "http.client_ip": "9.10.11.12", }) assert.Equal(t, model.Source{IP: net.ParseIP("1.2.3.4"), Port: 5678}, event.Source) assert.Equal(t, model.Client{IP: net.ParseIP("9.10.11.12")}, event.Client) } func TestHTTPTransactionStatusCode(t *testing.T) { - event := transformTransactionWithAttributes(t, map[string]pdata.AttributeValue{ - "http.status_code": pdata.NewAttributeValueInt(200), + event := transformTransactionWithAttributes(t, map[string]interface{}{ + "http.status_code": 200, }) assert.Equal(t, 200, event.HTTP.Response.StatusCode) } @@ -418,16 +419,16 @@ func TestDatabaseSpan(t *testing.T) { // https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/database.md#mysql connectionString := "Server=shopdb.example.com;Database=ShopDb;Uid=billing_user;TableCache=true;UseCompression=True;MinimumPoolSize=10;MaximumPoolSize=50;" dbStatement := fmt.Sprintf("SELECT * FROM orders WHERE order_id = '%s'", strings.Repeat("*", 1024)) // should not be truncated! - event := transformSpanWithAttributes(t, map[string]pdata.AttributeValue{ - "db.system": pdata.NewAttributeValueString("mysql"), - "db.connection_string": pdata.NewAttributeValueString(connectionString), - "db.user": pdata.NewAttributeValueString("billing_user"), - "db.name": pdata.NewAttributeValueString("ShopDb"), - "db.statement": pdata.NewAttributeValueString(dbStatement), - "net.peer.name": pdata.NewAttributeValueString("shopdb.example.com"), - "net.peer.ip": pdata.NewAttributeValueString("192.0.2.12"), - "net.peer.port": pdata.NewAttributeValueInt(3306), - "net.transport": pdata.NewAttributeValueString("IP.TCP"), + event := transformSpanWithAttributes(t, map[string]interface{}{ + "db.system": "mysql", + "db.connection_string": connectionString, + "db.user": "billing_user", + "db.name": "ShopDb", + "db.statement": dbStatement, + "net.peer.name": "shopdb.example.com", + "net.peer.ip": "192.0.2.12", + "net.peer.port": 3306, + "net.transport": "IP.TCP", }) assert.Equal(t, "db", event.Span.Type) @@ -460,11 +461,11 @@ func TestDatabaseSpan(t *testing.T) { func TestInstrumentationLibrary(t *testing.T) { traces, spans := newTracesSpans() - spans.InstrumentationLibrary().SetName("library-name") - spans.InstrumentationLibrary().SetVersion("1.2.3") + spans.Scope().SetName("library-name") + spans.Scope().SetVersion("1.2.3") otelSpan := spans.Spans().AppendEmpty() - otelSpan.SetTraceID(pdata.NewTraceID([16]byte{1})) - otelSpan.SetSpanID(pdata.NewSpanID([8]byte{2})) + otelSpan.SetTraceID(pcommon.NewTraceID([16]byte{1})) + otelSpan.SetSpanID(pcommon.NewSpanID([8]byte{2})) events := transformTraces(t, traces) event := events[0] @@ -473,14 +474,14 @@ func TestInstrumentationLibrary(t *testing.T) { } func TestRPCTransaction(t *testing.T) { - event := transformTransactionWithAttributes(t, map[string]pdata.AttributeValue{ - "rpc.system": pdata.NewAttributeValueString("grpc"), - "rpc.service": pdata.NewAttributeValueString("myservice.EchoService"), - "rpc.method": pdata.NewAttributeValueString("exampleMethod"), - "rpc.grpc.status_code": pdata.NewAttributeValueInt(int64(codes.Unavailable)), - "net.peer.name": pdata.NewAttributeValueString("peer_name"), - "net.peer.ip": pdata.NewAttributeValueString("10.20.30.40"), - "net.peer.port": pdata.NewAttributeValueInt(123), + event := transformTransactionWithAttributes(t, map[string]interface{}{ + "rpc.system": "grpc", + "rpc.service": "myservice.EchoService", + "rpc.method": "exampleMethod", + "rpc.grpc.status_code": int64(codes.Unavailable), + "net.peer.name": "peer_name", + "net.peer.ip": "10.20.30.40", + "net.peer.port": 123, }) assert.Equal(t, "request", event.Transaction.Type) assert.Equal(t, "Unavailable", event.Transaction.Result) @@ -493,13 +494,13 @@ func TestRPCTransaction(t *testing.T) { } func TestRPCSpan(t *testing.T) { - event := transformSpanWithAttributes(t, map[string]pdata.AttributeValue{ - "rpc.system": pdata.NewAttributeValueString("grpc"), - "rpc.service": pdata.NewAttributeValueString("myservice.EchoService"), - "rpc.method": pdata.NewAttributeValueString("exampleMethod"), - "rpc.grpc.status_code": pdata.NewAttributeValueInt(int64(codes.Unavailable)), - "net.peer.ip": pdata.NewAttributeValueString("10.20.30.40"), - "net.peer.port": pdata.NewAttributeValueInt(123), + event := transformSpanWithAttributes(t, map[string]interface{}{ + "rpc.system": "grpc", + "rpc.service": "myservice.EchoService", + "rpc.method": "exampleMethod", + "rpc.grpc.status_code": int64(codes.Unavailable), + "net.peer.ip": "10.20.30.40", + "net.peer.port": 123, }) assert.Equal(t, "external", event.Span.Type) assert.Equal(t, "grpc", event.Span.Subtype) @@ -516,14 +517,14 @@ func TestRPCSpan(t *testing.T) { } func TestMessagingTransaction(t *testing.T) { - event := transformTransactionWithAttributes(t, map[string]pdata.AttributeValue{ - "messaging.destination": pdata.NewAttributeValueString("myQueue"), - }, func(s pdata.Span) { - s.SetKind(pdata.SpanKindConsumer) + event := transformTransactionWithAttributes(t, map[string]interface{}{ + "messaging.destination": "myQueue", + }, func(s ptrace.Span) { + s.SetKind(ptrace.SpanKindConsumer) // Set parentID to imply this isn't the root, but // kind==Consumer should still force the span to be translated // as a transaction. - s.SetParentSpanID(pdata.NewSpanID([8]byte{3})) + s.SetParentSpanID(pcommon.NewSpanID([8]byte{3})) }) assert.Equal(t, "messaging", event.Transaction.Type) assert.Empty(t, event.Labels) @@ -533,13 +534,13 @@ func TestMessagingTransaction(t *testing.T) { } func TestMessagingSpan(t *testing.T) { - event := transformSpanWithAttributes(t, map[string]pdata.AttributeValue{ - "messaging.system": pdata.NewAttributeValueString("kafka"), - "messaging.destination": pdata.NewAttributeValueString("myTopic"), - "net.peer.ip": pdata.NewAttributeValueString("10.20.30.40"), - "net.peer.port": pdata.NewAttributeValueInt(123), - }, func(s pdata.Span) { - s.SetKind(pdata.SpanKindProducer) + event := transformSpanWithAttributes(t, map[string]interface{}{ + "messaging.system": "kafka", + "messaging.destination": "myTopic", + "net.peer.ip": "10.20.30.40", + "net.peer.port": 123, + }, func(s ptrace.Span) { + s.SetKind(ptrace.SpanKindProducer) }) assert.Equal(t, "messaging", event.Span.Type) assert.Equal(t, "kafka", event.Span.Subtype) @@ -557,7 +558,7 @@ func TestMessagingSpan(t *testing.T) { } func TestMessagingSpan_DestinationResource(t *testing.T) { - test := func(t *testing.T, expectedDestination model.Destination, expectedDestinationService *model.DestinationService, attrs map[string]pdata.AttributeValue) { + test := func(t *testing.T, expectedDestination model.Destination, expectedDestinationService *model.DestinationService, attrs map[string]interface{}) { t.Helper() event := transformSpanWithAttributes(t, attrs) assert.Equal(t, expectedDestination, event.Destination) @@ -571,11 +572,11 @@ func TestMessagingSpan_DestinationResource(t *testing.T) { Type: "messaging", Name: "testsvc", Resource: "127.0.0.1/testtopic", - }, map[string]pdata.AttributeValue{ - "messaging.system": pdata.NewAttributeValueString("kafka"), - "messaging.destination": pdata.NewAttributeValueString("testtopic"), - "peer.service": pdata.NewAttributeValueString("testsvc"), - "peer.address": pdata.NewAttributeValueString("127.0.0.1"), + }, map[string]interface{}{ + "messaging.system": "kafka", + "messaging.destination": "testtopic", + "peer.service": "testsvc", + "peer.address": "127.0.0.1", }) }) t.Run("system_destination_peerservice", func(t *testing.T) { @@ -583,10 +584,10 @@ func TestMessagingSpan_DestinationResource(t *testing.T) { Type: "messaging", Name: "testsvc", Resource: "testsvc/testtopic", - }, map[string]pdata.AttributeValue{ - "messaging.system": pdata.NewAttributeValueString("kafka"), - "messaging.destination": pdata.NewAttributeValueString("testtopic"), - "peer.service": pdata.NewAttributeValueString("testsvc"), + }, map[string]interface{}{ + "messaging.system": "kafka", + "messaging.destination": "testtopic", + "peer.service": "testsvc", }) }) t.Run("system_destination", func(t *testing.T) { @@ -594,37 +595,37 @@ func TestMessagingSpan_DestinationResource(t *testing.T) { Type: "messaging", Name: "kafka", Resource: "kafka/testtopic", - }, map[string]pdata.AttributeValue{ - "messaging.system": pdata.NewAttributeValueString("kafka"), - "messaging.destination": pdata.NewAttributeValueString("testtopic"), + }, map[string]interface{}{ + "messaging.system": "kafka", + "messaging.destination": "testtopic", }) }) } func TestSpanType(t *testing.T) { // Internal spans default to app.internal. - event := transformSpanWithAttributes(t, map[string]pdata.AttributeValue{}, func(s pdata.Span) { - s.SetKind(pdata.SpanKindInternal) + event := transformSpanWithAttributes(t, map[string]interface{}{}, func(s ptrace.Span) { + s.SetKind(ptrace.SpanKindInternal) }) assert.Equal(t, "app", event.Span.Type) assert.Equal(t, "internal", event.Span.Subtype) // All other spans default to unknown. - event = transformSpanWithAttributes(t, map[string]pdata.AttributeValue{}, func(s pdata.Span) { - s.SetKind(pdata.SpanKindClient) + event = transformSpanWithAttributes(t, map[string]interface{}{}, func(s ptrace.Span) { + s.SetKind(ptrace.SpanKindClient) }) assert.Equal(t, "unknown", event.Span.Type) assert.Equal(t, "", event.Span.Subtype) } func TestSpanNetworkAttributes(t *testing.T) { - networkAttributes := map[string]pdata.AttributeValue{ - "net.host.connection.type": pdata.NewAttributeValueString("cell"), - "net.host.connection.subtype": pdata.NewAttributeValueString("LTE"), - "net.host.carrier.name": pdata.NewAttributeValueString("Vodafone"), - "net.host.carrier.mnc": pdata.NewAttributeValueString("01"), - "net.host.carrier.mcc": pdata.NewAttributeValueString("101"), - "net.host.carrier.icc": pdata.NewAttributeValueString("UK"), + networkAttributes := map[string]interface{}{ + "net.host.connection.type": "cell", + "net.host.connection.subtype": "LTE", + "net.host.carrier.name": "Vodafone", + "net.host.carrier.mnc": "01", + "net.host.carrier.mcc": "101", + "net.host.carrier.icc": "UK", } txEvent := transformTransactionWithAttributes(t, networkAttributes) spanEvent := transformSpanWithAttributes(t, networkAttributes) @@ -646,23 +647,12 @@ func TestSpanNetworkAttributes(t *testing.T) { } func TestArrayLabels(t *testing.T) { - stringArray := pdata.NewAttributeValueArray() - stringArray.SliceVal().AppendEmpty().SetStringVal("string1") - stringArray.SliceVal().AppendEmpty().SetStringVal("string2") + stringArray := []interface{}{"string1", "string2"} + boolArray := []interface{}{false, true} + intArray := []interface{}{1234, 5678} + floatArray := []interface{}{1234.5678, 9123.234123123} - boolArray := pdata.NewAttributeValueArray() - boolArray.SliceVal().AppendEmpty().SetBoolVal(false) - boolArray.SliceVal().AppendEmpty().SetBoolVal(true) - - intArray := pdata.NewAttributeValueArray() - intArray.SliceVal().AppendEmpty().SetIntVal(1234) - intArray.SliceVal().AppendEmpty().SetIntVal(5678) - - floatArray := pdata.NewAttributeValueArray() - floatArray.SliceVal().AppendEmpty().SetDoubleVal(1234.5678) - floatArray.SliceVal().AppendEmpty().SetDoubleVal(9123.234123123) - - txEvent := transformTransactionWithAttributes(t, map[string]pdata.AttributeValue{ + txEvent := transformTransactionWithAttributes(t, map[string]interface{}{ "string_array": stringArray, "bool_array": boolArray, "int_array": intArray, @@ -677,7 +667,7 @@ func TestArrayLabels(t *testing.T) { "float_array": {Values: []float64{1234.5678, 9123.234123123}}, }, txEvent.NumericLabels) - spanEvent := transformSpanWithAttributes(t, map[string]pdata.AttributeValue{ + spanEvent := transformSpanWithAttributes(t, map[string]interface{}{ "string_array": stringArray, "bool_array": boolArray, "int_array": intArray, @@ -720,20 +710,20 @@ func TestConsumeTracesExportTimestamp(t *testing.T) { exportedExceptionTimestamp := exportTimestamp.Add(exceptionOffset) otelSpan1 := otelSpans.Spans().AppendEmpty() - otelSpan1.SetTraceID(pdata.NewTraceID([16]byte{1})) - otelSpan1.SetSpanID(pdata.NewSpanID([8]byte{2})) - otelSpan1.SetStartTimestamp(pdata.NewTimestampFromTime(exportedTransactionTimestamp)) - otelSpan1.SetEndTimestamp(pdata.NewTimestampFromTime(exportedTransactionTimestamp.Add(transactionDuration))) + otelSpan1.SetTraceID(pcommon.NewTraceID([16]byte{1})) + otelSpan1.SetSpanID(pcommon.NewSpanID([8]byte{2})) + otelSpan1.SetStartTimestamp(pcommon.NewTimestampFromTime(exportedTransactionTimestamp)) + otelSpan1.SetEndTimestamp(pcommon.NewTimestampFromTime(exportedTransactionTimestamp.Add(transactionDuration))) otelSpan2 := otelSpans.Spans().AppendEmpty() - otelSpan2.SetTraceID(pdata.NewTraceID([16]byte{1})) - otelSpan2.SetSpanID(pdata.NewSpanID([8]byte{2})) - otelSpan2.SetParentSpanID(pdata.NewSpanID([8]byte{3})) - otelSpan2.SetStartTimestamp(pdata.NewTimestampFromTime(exportedSpanTimestamp)) - otelSpan2.SetEndTimestamp(pdata.NewTimestampFromTime(exportedSpanTimestamp.Add(spanDuration))) + otelSpan2.SetTraceID(pcommon.NewTraceID([16]byte{1})) + otelSpan2.SetSpanID(pcommon.NewSpanID([8]byte{2})) + otelSpan2.SetParentSpanID(pcommon.NewSpanID([8]byte{3})) + otelSpan2.SetStartTimestamp(pcommon.NewTimestampFromTime(exportedSpanTimestamp)) + otelSpan2.SetEndTimestamp(pcommon.NewTimestampFromTime(exportedSpanTimestamp.Add(spanDuration))) otelSpanEvent := otelSpan2.Events().AppendEmpty() - otelSpanEvent.SetTimestamp(pdata.NewTimestampFromTime(exportedExceptionTimestamp)) + otelSpanEvent.SetTimestamp(pcommon.NewTimestampFromTime(exportedExceptionTimestamp)) otelSpanEvent.SetName("exception") otelSpanEvent.Attributes().InsertString("exception.type", "the_type") otelSpanEvent.Attributes().InsertString("exception.message", "the_message") @@ -753,16 +743,16 @@ func TestConsumeTracesExportTimestamp(t *testing.T) { } func TestSpanLinks(t *testing.T) { - linkedTraceID := pdata.NewTraceID([16]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}) - linkedSpanID := pdata.NewSpanID([8]byte{7, 6, 5, 4, 3, 2, 1, 0}) - spanLink := pdata.NewSpanLink() + linkedTraceID := pcommon.NewTraceID([16]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}) + linkedSpanID := pcommon.NewSpanID([8]byte{7, 6, 5, 4, 3, 2, 1, 0}) + spanLink := ptrace.NewSpanLink() spanLink.SetSpanID(linkedSpanID) spanLink.SetTraceID(linkedTraceID) - txEvent := transformTransactionWithAttributes(t, map[string]pdata.AttributeValue{}, func(span pdata.Span) { + txEvent := transformTransactionWithAttributes(t, map[string]interface{}{}, func(span ptrace.Span) { spanLink.CopyTo(span.Links().AppendEmpty()) }) - spanEvent := transformTransactionWithAttributes(t, map[string]pdata.AttributeValue{}, func(span pdata.Span) { + spanEvent := transformTransactionWithAttributes(t, map[string]interface{}{}, func(span ptrace.Span) { spanLink.CopyTo(span.Links().AppendEmpty()) }) for _, event := range []model.APMEvent{txEvent, spanEvent} { @@ -774,7 +764,7 @@ func TestSpanLinks(t *testing.T) { } func TestConsumer_JaegerMetadata(t *testing.T) { - jaegerBatch := jaegermodel.Batch{ + jaegerBatch := &jaegermodel.Batch{ Spans: []*jaegermodel.Span{{ StartTime: testStartTime(), Tags: []jaegermodel.KeyValue{jaegerKeyValue("span.kind", "client")}, @@ -812,7 +802,8 @@ func TestConsumer_JaegerMetadata(t *testing.T) { var batches []*model.Batch recorder := batchRecorderBatchProcessor(&batches) jaegerBatch.Process = tc.process - traces := jaegertranslator.ProtoBatchToInternalTraces(jaegerBatch) + traces, err := jaegertranslator.ProtoToTraces([]*jaegermodel.Batch{jaegerBatch}) + require.NoError(t, err) require.NoError(t, (&otel.Consumer{Processor: recorder}).ConsumeTraces(context.Background(), traces)) docs := encodeBatch(t, batches...) @@ -822,7 +813,7 @@ func TestConsumer_JaegerMetadata(t *testing.T) { } func TestConsumer_JaegerSampleRate(t *testing.T) { - jaegerBatch := jaegermodel.Batch{ + traces, err := jaegertranslator.ProtoToTraces([]*jaegermodel.Batch{{ Process: jaegermodel.NewProcess("", jaegerKeyValues( "jaeger.version", "unknown", "hostname", "host-abc", @@ -858,8 +849,8 @@ func TestConsumer_JaegerSampleRate(t *testing.T) { jaegerKeyValue("sampler.param", 2.0), // 2 traces per second }, }}, - } - traces := jaegertranslator.ProtoBatchToInternalTraces(jaegerBatch) + }}) + require.NoError(t, err) var batches []*model.Batch recorder := batchRecorderBatchProcessor(&batches) @@ -882,7 +873,7 @@ func TestConsumer_JaegerTraceID(t *testing.T) { var batches []*model.Batch recorder := batchRecorderBatchProcessor(&batches) - jaegerBatch := jaegermodel.Batch{ + traces, err := jaegertranslator.ProtoToTraces([]*jaegermodel.Batch{{ Process: jaegermodel.NewProcess("", jaegerKeyValues("jaeger.version", "unknown")), Spans: []*jaegermodel.Span{{ TraceID: jaegermodel.NewTraceID(0, 0x000046467830), @@ -891,8 +882,8 @@ func TestConsumer_JaegerTraceID(t *testing.T) { TraceID: jaegermodel.NewTraceID(0x000046467830, 0x000046467830), SpanID: jaegermodel.NewSpanID(789), }}, - } - traces := jaegertranslator.ProtoBatchToInternalTraces(jaegerBatch) + }}) + require.NoError(t, err) require.NoError(t, (&otel.Consumer{Processor: recorder}).ConsumeTraces(context.Background(), traces)) batch := *batches[0] @@ -1007,14 +998,14 @@ func TestConsumer_JaegerTransaction(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - batch := jaegermodel.Batch{ + traces, err := jaegertranslator.ProtoToTraces([]*jaegermodel.Batch{{ Process: jaegermodel.NewProcess("", []jaegermodel.KeyValue{ jaegerKeyValue("hostname", "host-abc"), jaegerKeyValue("jaeger.version", "unknown"), }), Spans: tc.spans, - } - traces := jaegertranslator.ProtoBatchToInternalTraces(batch) + }}) + require.NoError(t, err) var batches []*model.Batch recorder := batchRecorderBatchProcessor(&batches) @@ -1111,7 +1102,7 @@ func TestConsumer_JaegerSpan(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - batch := jaegermodel.Batch{ + batch := &jaegermodel.Batch{ Process: jaegermodel.NewProcess("", []jaegermodel.KeyValue{ jaegerKeyValue("hostname", "host-abc"), jaegerKeyValue("jaeger.version", "unknown"), @@ -1129,7 +1120,8 @@ func TestConsumer_JaegerSpan(t *testing.T) { SpanID: 0x58585858, }} } - traces := jaegertranslator.ProtoBatchToInternalTraces(batch) + traces, err := jaegertranslator.ProtoToTraces([]*jaegermodel.Batch{batch}) + require.NoError(t, err) var batches []*model.Batch recorder := batchRecorderBatchProcessor(&batches) @@ -1142,7 +1134,7 @@ func TestConsumer_JaegerSpan(t *testing.T) { } func TestJaegerServiceVersion(t *testing.T) { - jaegerBatch := jaegermodel.Batch{ + traces, err := jaegertranslator.ProtoToTraces([]*jaegermodel.Batch{{ Process: jaegermodel.NewProcess("", jaegerKeyValues( "jaeger.version", "unknown", "service.version", "process_tag_value", @@ -1157,8 +1149,8 @@ func TestJaegerServiceVersion(t *testing.T) { jaegerKeyValue("service.version", "span_tag_value"), }, }}, - } - traces := jaegertranslator.ProtoBatchToInternalTraces(jaegerBatch) + }}) + require.NoError(t, err) var batches []*model.Batch recorder := batchRecorderBatchProcessor(&batches) @@ -1173,7 +1165,7 @@ func TestTracesLogging(t *testing.T) { for _, level := range []logp.Level{logp.InfoLevel, logp.DebugLevel} { t.Run(level.String(), func(t *testing.T) { logp.DevelopmentSetup(logp.ToObserverOutput(), logp.WithLevel(level)) - transformTraces(t, pdata.NewTraces()) + transformTraces(t, ptrace.NewTraces()) logs := logp.ObserverLogs().TakeAll() if level == logp.InfoLevel { assert.Empty(t, logs) @@ -1185,7 +1177,7 @@ func TestTracesLogging(t *testing.T) { } func TestServiceTarget(t *testing.T) { - test := func(t *testing.T, expected *model.ServiceTarget, input map[string]pdata.AttributeValue) { + test := func(t *testing.T, expected *model.ServiceTarget, input map[string]interface{}) { t.Helper() event := transformSpanWithAttributes(t, input) assert.Equal(t, expected, event.Service.Target) @@ -1194,9 +1186,9 @@ func TestServiceTarget(t *testing.T) { test(t, &model.ServiceTarget{ Type: "postgresql", Name: "testsvc", - }, map[string]pdata.AttributeValue{ - "peer.service": pdata.NewAttributeValueString("testsvc"), - "db.system": pdata.NewAttributeValueString("postgresql"), + }, map[string]interface{}{ + "peer.service": "testsvc", + "db.system": "postgresql", }) }) @@ -1204,10 +1196,10 @@ func TestServiceTarget(t *testing.T) { test(t, &model.ServiceTarget{ Type: "postgresql", Name: "testdb", - }, map[string]pdata.AttributeValue{ - "peer.service": pdata.NewAttributeValueString("testsvc"), - "db.name": pdata.NewAttributeValueString("testdb"), - "db.system": pdata.NewAttributeValueString("postgresql"), + }, map[string]interface{}{ + "peer.service": "testsvc", + "db.name": "testdb", + "db.system": "postgresql", }) }) @@ -1215,8 +1207,8 @@ func TestServiceTarget(t *testing.T) { test(t, &model.ServiceTarget{ Type: "db", Name: "testdb", - }, map[string]pdata.AttributeValue{ - "db.name": pdata.NewAttributeValueString("testdb"), + }, map[string]interface{}{ + "db.name": "testdb", }) }) @@ -1224,9 +1216,9 @@ func TestServiceTarget(t *testing.T) { test(t, &model.ServiceTarget{ Name: "test-url:443", Type: "http", - }, map[string]pdata.AttributeValue{ - "peer.service": pdata.NewAttributeValueString("testsvc"), - "http.url": pdata.NewAttributeValueString("https://test-url:443/"), + }, map[string]interface{}{ + "peer.service": "testsvc", + "http.url": "https://test-url:443/", }) }) @@ -1234,10 +1226,10 @@ func TestServiceTarget(t *testing.T) { test(t, &model.ServiceTarget{ Name: "test-url:443", Type: "http", - }, map[string]pdata.AttributeValue{ - "http.scheme": pdata.NewAttributeValueString("https"), - "http.host": pdata.NewAttributeValueString("test-url:443"), - "http.target": pdata.NewAttributeValueString("/"), + }, map[string]interface{}{ + "http.scheme": "https", + "http.host": "test-url:443", + "http.target": "/", }) }) @@ -1245,12 +1237,12 @@ func TestServiceTarget(t *testing.T) { test(t, &model.ServiceTarget{ Name: "test-url:443", Type: "http", - }, map[string]pdata.AttributeValue{ - "http.scheme": pdata.NewAttributeValueString("https"), - "net.peer.name": pdata.NewAttributeValueString("test-url"), - "net.peer.ip": pdata.NewAttributeValueString("::1"), // net.peer.name preferred - "net.peer.port": pdata.NewAttributeValueInt(443), - "http.target": pdata.NewAttributeValueString("/"), + }, map[string]interface{}{ + "http.scheme": "https", + "net.peer.name": "test-url", + "net.peer.ip": "::1", // net.peer.name preferred + "net.peer.port": 443, + "http.target": "/", }) }) @@ -1258,11 +1250,11 @@ func TestServiceTarget(t *testing.T) { test(t, &model.ServiceTarget{ Name: "[::1]:443", Type: "http", - }, map[string]pdata.AttributeValue{ - "http.scheme": pdata.NewAttributeValueString("https"), - "net.peer.ip": pdata.NewAttributeValueString("::1"), // net.peer.name preferred - "net.peer.port": pdata.NewAttributeValueInt(443), - "http.target": pdata.NewAttributeValueString("/"), + }, map[string]interface{}{ + "http.scheme": "https", + "net.peer.ip": "::1", // net.peer.name preferred + "net.peer.port": 443, + "http.target": "/", }) }) @@ -1270,9 +1262,9 @@ func TestServiceTarget(t *testing.T) { test(t, &model.ServiceTarget{ Name: "testsvc", Type: "grpc", - }, map[string]pdata.AttributeValue{ - "peer.service": pdata.NewAttributeValueString("testsvc"), - "rpc.system": pdata.NewAttributeValueString("grpc"), + }, map[string]interface{}{ + "peer.service": "testsvc", + "rpc.system": "grpc", }) }) @@ -1280,10 +1272,10 @@ func TestServiceTarget(t *testing.T) { test(t, &model.ServiceTarget{ Name: "test", Type: "grpc", - }, map[string]pdata.AttributeValue{ - "peer.service": pdata.NewAttributeValueString("testsvc"), - "rpc.system": pdata.NewAttributeValueString("grpc"), - "rpc.service": pdata.NewAttributeValueString("test"), + }, map[string]interface{}{ + "peer.service": "testsvc", + "rpc.system": "grpc", + "rpc.service": "test", }) }) @@ -1291,8 +1283,8 @@ func TestServiceTarget(t *testing.T) { test(t, &model.ServiceTarget{ Name: "test", Type: "external", - }, map[string]pdata.AttributeValue{ - "rpc.service": pdata.NewAttributeValueString("test"), + }, map[string]interface{}{ + "rpc.service": "test", }) }) @@ -1300,10 +1292,10 @@ func TestServiceTarget(t *testing.T) { test(t, &model.ServiceTarget{ Name: "myTopic", Type: "kafka", - }, map[string]pdata.AttributeValue{ - "peer.service": pdata.NewAttributeValueString("testsvc"), - "messaging.system": pdata.NewAttributeValueString("kafka"), - "messaging.destination": pdata.NewAttributeValueString("myTopic"), + }, map[string]interface{}{ + "peer.service": "testsvc", + "messaging.system": "kafka", + "messaging.destination": "myTopic", }) }) @@ -1311,11 +1303,11 @@ func TestServiceTarget(t *testing.T) { test(t, &model.ServiceTarget{ Name: "testsvc", Type: "kafka", - }, map[string]pdata.AttributeValue{ - "peer.service": pdata.NewAttributeValueString("testsvc"), - "messaging.temp_destination": pdata.NewAttributeValueBool(true), - "messaging.system": pdata.NewAttributeValueString("kafka"), - "messaging.destination": pdata.NewAttributeValueString("myTopic"), + }, map[string]interface{}{ + "peer.service": "testsvc", + "messaging.temp_destination": true, + "messaging.system": "kafka", + "messaging.destination": "myTopic", }) }) @@ -1323,8 +1315,8 @@ func TestServiceTarget(t *testing.T) { test(t, &model.ServiceTarget{ Name: "myTopic", Type: "messaging", - }, map[string]pdata.AttributeValue{ - "messaging.destination": pdata.NewAttributeValueString("myTopic"), + }, map[string]interface{}{ + "messaging.destination": "myTopic", }) }) } @@ -1451,39 +1443,39 @@ func jaegerKeyValue(k string, v interface{}) jaegermodel.KeyValue { return kv } -func transformTransactionWithAttributes(t *testing.T, attrs map[string]pdata.AttributeValue, configFns ...func(pdata.Span)) model.APMEvent { +func transformTransactionWithAttributes(t *testing.T, attrs map[string]interface{}, configFns ...func(ptrace.Span)) model.APMEvent { traces, spans := newTracesSpans() otelSpan := spans.Spans().AppendEmpty() - otelSpan.SetTraceID(pdata.NewTraceID([16]byte{1})) - otelSpan.SetSpanID(pdata.NewSpanID([8]byte{2})) + otelSpan.SetTraceID(pcommon.NewTraceID([16]byte{1})) + otelSpan.SetSpanID(pcommon.NewSpanID([8]byte{2})) for _, fn := range configFns { fn(otelSpan) } - pdata.NewAttributeMapFromMap(attrs).CopyTo(otelSpan.Attributes()) + pcommon.NewMapFromRaw(attrs).CopyTo(otelSpan.Attributes()) events := transformTraces(t, traces) return events[0] } -func transformSpanWithAttributes(t *testing.T, attrs map[string]pdata.AttributeValue, configFns ...func(pdata.Span)) model.APMEvent { +func transformSpanWithAttributes(t *testing.T, attrs map[string]interface{}, configFns ...func(ptrace.Span)) model.APMEvent { traces, spans := newTracesSpans() otelSpan := spans.Spans().AppendEmpty() - otelSpan.SetTraceID(pdata.NewTraceID([16]byte{1})) - otelSpan.SetSpanID(pdata.NewSpanID([8]byte{2})) - otelSpan.SetParentSpanID(pdata.NewSpanID([8]byte{3})) + otelSpan.SetTraceID(pcommon.NewTraceID([16]byte{1})) + otelSpan.SetSpanID(pcommon.NewSpanID([8]byte{2})) + otelSpan.SetParentSpanID(pcommon.NewSpanID([8]byte{3})) for _, fn := range configFns { fn(otelSpan) } - pdata.NewAttributeMapFromMap(attrs).CopyTo(otelSpan.Attributes()) + pcommon.NewMapFromRaw(attrs).CopyTo(otelSpan.Attributes()) events := transformTraces(t, traces) return events[0] } -func transformTransactionSpanEvents(t *testing.T, language string, spanEvents ...pdata.SpanEvent) (transaction model.APMEvent, events []model.APMEvent) { +func transformTransactionSpanEvents(t *testing.T, language string, spanEvents ...ptrace.SpanEvent) (transaction model.APMEvent, events []model.APMEvent) { traces, spans := newTracesSpans() traces.ResourceSpans().At(0).Resource().Attributes().InsertString(semconv.AttributeTelemetrySDKLanguage, language) otelSpan := spans.Spans().AppendEmpty() - otelSpan.SetTraceID(pdata.NewTraceID([16]byte{1})) - otelSpan.SetSpanID(pdata.NewSpanID([8]byte{2})) + otelSpan.SetTraceID(pcommon.NewTraceID([16]byte{1})) + otelSpan.SetSpanID(pcommon.NewSpanID([8]byte{2})) for _, spanEvent := range spanEvents { spanEvent.CopyTo(otelSpan.Events().AppendEmpty()) } @@ -1493,7 +1485,7 @@ func transformTransactionSpanEvents(t *testing.T, language string, spanEvents .. return allEvents[0], allEvents[1:] } -func transformTraces(t *testing.T, traces pdata.Traces) model.Batch { +func transformTraces(t *testing.T, traces ptrace.Traces) model.Batch { var processed model.Batch processor := model.ProcessBatchFunc(func(ctx context.Context, batch *model.Batch) error { if processed != nil { @@ -1506,11 +1498,11 @@ func transformTraces(t *testing.T, traces pdata.Traces) model.Batch { return processed } -func newTracesSpans() (pdata.Traces, pdata.InstrumentationLibrarySpans) { - traces := pdata.NewTraces() +func newTracesSpans() (ptrace.Traces, ptrace.ScopeSpans) { + traces := ptrace.NewTraces() resourceSpans := traces.ResourceSpans().AppendEmpty() - librarySpans := resourceSpans.InstrumentationLibrarySpans().AppendEmpty() - return traces, librarySpans + scopeSpans := resourceSpans.ScopeSpans().AppendEmpty() + return traces, scopeSpans } func newInt(v int) *int { diff --git a/testdata/jaeger/batch_0.approved.json b/testdata/jaeger/batch_0.approved.json index 703a4467d32..8d591e671c4 100644 --- a/testdata/jaeger/batch_0.approved.json +++ b/testdata/jaeger/batch_0.approved.json @@ -65,10 +65,10 @@ ] }, "labels": { - "event": "baggage", "key": "customer", "value": "Japanese Desserts" }, + "message": "baggage", "processor": { "event": "log", "name": "log" @@ -98,10 +98,10 @@ ] }, "labels": { - "event": "Searching for nearby drivers", "level": "info", "location": "728,326" }, + "message": "Searching for nearby drivers", "processor": { "event": "log", "name": "log" @@ -275,9 +275,9 @@ ] }, "labels": { - "event": "Search successful", "level": "info" }, + "message": "Search successful", "numeric_labels": { "num_drivers": 10 }, diff --git a/testdata/jaeger/batch_1.approved.json b/testdata/jaeger/batch_1.approved.json index 4c3ee722627..eac4a70bef6 100644 --- a/testdata/jaeger/batch_1.approved.json +++ b/testdata/jaeger/batch_1.approved.json @@ -60,9 +60,9 @@ ] }, "labels": { - "event": "Found drivers", "level": "info" }, + "message": "Found drivers", "parent": { "id": "7be2fd98d0973be3" },